SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "C:/tmp/spark")
.config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
.appName("my-test")
.getOrCreate
.readStream
.schema(schema)
.json("src/test/data")
.cache
.writeStream
.start
.awaitTermination
Podczas wykonywania tej próbki w iskry 2.1.0 dostałem błąd. Bez opcji .cache
działał zgodnie z przeznaczeniem, ale z opcją .cache
mam:Dlaczego korzystanie z pamięci podręcznej podczas przesyłania strumieniowego zestawów danych kończy się niepowodzeniem z "AnalysisException: Zapytania ze źródłami strumieniowymi muszą być wykonywane za pomocą metody writeStream.start()"?
Wyjątek w wątku „główne” org.apache.spark.sql.AnalysisException: Zapytania z źródeł strumieniowych musi być wykonana z writeStream.start (); FileSource [src/test/data] at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ apache $ spark $ sql $ katalizator $ analiza $ UnsupportedOperationChecker $$ throwError (UnsupportedOperationChecker.scala: 196) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply (UnsupportedOperationChecker.scala: 35) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1 .apply (UnsupportedOperationChecker.scala: 33) w org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp (TreeNode.scala: 128) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $. checkForBatch (UnsupportedOperationChecker.scala: 33) w org.apache.spark.sql.execution.QueryExecution.assertSupported (QueryExecut ion.scala: 58) at org.apache.spark.sql.execution.QueryExecution.withCachedData $ lzycompute (QueryExecution.scala: 69) at org.apache.spark.sql.execution.QueryExecution.withCachedData (QueryExecution.scala: 67) na org.apache.spark.sql.execution.QueryExecution.optimizedPlan $ lzycompute (QueryExecution.scala: 73) na org.apache.spark.sql.execution.QueryExecution.optimizedPlan (QueryExecution.scala: 73) w org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute (QueryExecution.scala: 79) at org.apache.spark.sql.execution.QueryExecution.sparkPlan (QueryExecution.scala: 75) at org.apache. spark.sql.execution.QueryExecution.executedPlan $ lzycompute (QueryExecution.scala: 84) at org.apache.spark.sql.execution.QueryExecution.executedPlan (QueryEx ecution.scala: 84) w org.apache.spark.sql.execution.CacheManager $$ anonfun $ cacheQuery $ 1.apply (CacheManager.scala: 102) w org.apache.spark.sql.execution.CacheManager.writeLock (CacheManager.scala: 65) w org.apache.spark.sql.execution.CacheManager.cacheQuery (CacheManager.scala: 89) at org.apache.spark.sql.Dataset.persist (Dataset.scala: 2479) at org.apache.spark.sql.Dataset.cache (Dataset.scala: 2489) w org.me.App $ .main (App.scala: 23) at org.me.App.main (App.scala)
Każdy pomysł?
Przepraszam, ale nie sądzę, że po prostu nie należy używać pamięci podręcznej. –
Martin, zapraszam do udziału w komentarzach do [SPARK-20927] (https://issues.apache.org/jira/browse/SPARK-20927?focusedCommentId=16334363&page=com.atlassian.jira.plugin.system.issuetabpanels % 3Acomment-tabpanel # comment-16334363) o potrzebie buforowania obliczeń strumieniowych – mathieu