10
SparkSession 
    .builder 
    .master("local[*]") 
    .config("spark.sql.warehouse.dir", "C:/tmp/spark") 
    .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint") 
    .appName("my-test") 
    .getOrCreate 
    .readStream 
    .schema(schema) 
    .json("src/test/data") 
    .cache 
    .writeStream 
    .start 
    .awaitTermination 

Podczas wykonywania tej próbki w iskry 2.1.0 dostałem błąd. Bez opcji .cache działał zgodnie z przeznaczeniem, ale z opcją .cache mam:Dlaczego korzystanie z pamięci podręcznej podczas przesyłania strumieniowego zestawów danych kończy się niepowodzeniem z "AnalysisException: Zapytania ze źródłami strumieniowymi muszą być wykonywane za pomocą metody writeStream.start()"?

Wyjątek w wątku „główne” org.apache.spark.sql.AnalysisException: Zapytania z źródeł strumieniowych musi być wykonana z writeStream.start (); FileSource [src/test/data] at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ apache $ spark $ sql $ katalizator $ analiza $ UnsupportedOperationChecker $$ throwError (UnsupportedOperationChecker.scala: 196) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply (UnsupportedOperationChecker.scala: 35) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1 .apply (UnsupportedOperationChecker.scala: 33) w org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp (TreeNode.scala: 128) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $. checkForBatch (UnsupportedOperationChecker.scala: 33) w org.apache.spark.sql.execution.QueryExecution.assertSupported (QueryExecut ion.scala: 58) at org.apache.spark.sql.execution.QueryExecution.withCachedData $ lzycompute (QueryExecution.scala: 69) at org.apache.spark.sql.execution.QueryExecution.withCachedData (QueryExecution.scala: 67) na org.apache.spark.sql.execution.QueryExecution.optimizedPlan $ lzycompute (QueryExecution.scala: 73) na org.apache.spark.sql.execution.QueryExecution.optimizedPlan (QueryExecution.scala: 73) w org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute (QueryExecution.scala: 79) at org.apache.spark.sql.execution.QueryExecution.sparkPlan (QueryExecution.scala: 75) at org.apache. spark.sql.execution.QueryExecution.executedPlan $ lzycompute (QueryExecution.scala: 84) at org.apache.spark.sql.execution.QueryExecution.executedPlan (QueryEx ecution.scala: 84) w org.apache.spark.sql.execution.CacheManager $$ anonfun $ cacheQuery $ 1.apply (CacheManager.scala: 102) w org.apache.spark.sql.execution.CacheManager.writeLock (CacheManager.scala: 65) w org.apache.spark.sql.execution.CacheManager.cacheQuery (CacheManager.scala: 89) at org.apache.spark.sql.Dataset.persist (Dataset.scala: 2479) at org.apache.spark.sql.Dataset.cache (Dataset.scala: 2489) w org.me.App $ .main (App.scala: 23) at org.me.App.main (App.scala)

Każdy pomysł?

+1

Przepraszam, ale nie sądzę, że po prostu nie należy używać pamięci podręcznej. –

+1

Martin, zapraszam do udziału w komentarzach do [SPARK-20927] (https://issues.apache.org/jira/browse/SPARK-20927?focusedCommentId=16334363&page=com.atlassian.jira.plugin.system.issuetabpanels % 3Acomment-tabpanel # comment-16334363) o potrzebie buforowania obliczeń strumieniowych – mathieu

Odpowiedz

10

Państwa (bardzo ciekawe) sprawa sprowadza się do następnej linii (które można wykonać w spark-shell):

scala> :type spark 
org.apache.spark.sql.SparkSession 

scala> spark.readStream.text("files").cache 
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; 
FileSource[files] 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34) 
    at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63) 
    at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74) 
    at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72) 
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) 
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89) 
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104) 
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68) 
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92) 
    at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603) 
    at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613) 
    ... 48 elided 

Powodem tego okazało się całkiem proste do wyjaśnienia (gra słów nie iskra SQL na explain przeznaczone).

tworzy tak zwany zbiór danych strumieniowych .

scala> val files = spark.readStream.text("files") 
files: org.apache.spark.sql.DataFrame = [value: string] 

scala> files.isStreaming 
res2: Boolean = true 

Streaming zbiorów danych są podstawą Spark SQL's Structured Streaming.

Jak można przeczytać w Structured żywo na Quick Example:

a następnie uruchomić obliczenia streamingu za pomocą start().

Cytowanie scaladoc z DataStreamWriter na start:

start(): StreamingQuery Rozpoczyna wykonanie zapytania strumieniowego, który będzie nieustannie wyjściowe wyniki do danej ścieżki jak przybywa nowych danych.

Tak, trzeba użyć start (lub foreach), aby rozpocząć wykonywanie kwerendy streamingu. Już to wiedziałeś.

... Ale tam Streaming strukturyzowane to Unsupported Operations:

Ponadto, istnieje kilka metod zbiór danych, który nie będzie działać na strumieniowe zbiorów danych. Są to akcje, które natychmiast uruchamiają zapytania i zwracają wyniki, co nie ma sensu w przypadku przesyłania strumieniowego zestawu danych.

Jeśli spróbujesz którejkolwiek z tych operacji, zobaczysz wyjątek AnalysisException, taki jak "operacja XYZ nie jest obsługiwana przez strumieniowanie DataFrames/Datasets".

To wygląda znajomo, , prawda?

cache jest nie w liście nieobsługiwanych operacji, ale to dlatego, że po prostu został pominięty (Zgłosiłem SPARK-20927 go naprawić).

powinien znajdować się na liście, ponieważ robi wykonać zapytanie, zanim zapytanie zostanie zarejestrowane w CacheManager Spark SQL.

Chodźmy głębiej w głąb Spark SQL ... wstrzymaj oddech ...

cacheispersist podczas persistrequests the current CacheManager to cache the query:

sparkSession.sharedState.cacheManager.cacheQuery(this) 

Chociaż buforowanie kwerendy CacheManagerrobiexecute it:

sparkSession.sessionState.executePlan(planToCache).executedPlan 

które my znam jest niedozwolone, ponieważ jest to start (lub foreach), aby to zrobić.

Problem rozwiązany!

+1

Myślałem, że to jest błąd, więc zgłosiłem go jeszcze wcześniej https://issues.apache.org/jira/browse/SPARK-20865, i po prostu potrzebowałem potwierdzenia moich trudnych sytuacji. Dzięki. –

+0

Link do mastera nie jest istotny, ponieważ kod docelowy może się zmienić. I myślę, że to jest to, co dodaje się do twoich linków – crak

+0

@crak Prawidłowy. Nie powinienem był używać wzorca dla linków. Jak myślisz, co byłoby lepsze? W przeszłości widziałem linki do konkretnych wersji, ale nie wiem, jak to zrobić dzisiaj na github. Umysł, aby zaoferować pomoc? Byłbym wdzięczny. –