2016-12-13 49 views
13

Używam Spark 1.5.Scala Spark: DataFrames + dołącz: java.util.concurrent.TimeoutException: Futures przekroczono limit czasu po [300 sekundach]

Mam dwa dataframes postać:

scala> libriFirstTable50Plus3DF 
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int] 

scala> linkPersonItemLessThan500DF 
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int] 

libriFirstTable50Plus3DF ma 766151 rekordy podczas linkPersonItemLessThan500DF ma 26694353 rekordy. Zauważ, że używam repartition(number) na linkPersonItemLessThan500DF, ponieważ zamierzam dołączyć do tych dwóch później. Śledzę się powyższy kod z:

val userTripletRankDF = linkPersonItemLessThan500DF 
    .join(libriFirstTable50Plus3DF, Seq("family_id")) 
    .take(20) 
    .foreach(println(_)) 

dla którego jestem coraz to wyjście:

16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200) 
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] 
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:  at scala.concurrent.Await$.result(package.scala:107) 
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190) 
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) 
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386) 
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386) 
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904) 
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385) 
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315) 
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378) 
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) 
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402) 
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363) 
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87) 
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89) 
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91) 
at $iwC$$iwC$$iwC.<init>(<console>:93) 
at $iwC$$iwC.<init>(<console>:95) 
at $iwC.<init>(<console>:97) 
at <init>(<console>:99) 
at .<init>(<console>:103) 
at .<clinit>(<console>) 
at .<init>(<console>:7) 
at .<clinit>(<console>) 
at $print(<console>) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) 
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
at org.apache.spark.repl.Main$.main(Main.scala:31) 
at org.apache.spark.repl.Main.main(Main.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

a ja nie rozumiem, co jest problemem. Czy to tak proste, jak zwiększenie czasu oczekiwania? Czy połączenie jest zbyt intensywne? Czy potrzebuję więcej pamięci? Czy shufffling jest intensywny? Czy ktoś może pomóc?

+0

pozwól mi wspomnieć moją odpowiedź na bardzo podobne pytanie: https://stackoverflow.com/a/48449467/418293 – mathieu

Odpowiedz

16

Dzieje się tak, ponieważ Spark próbuje przeprowadzić rozgałęzienie rozgłaszania Broadcast i jedna z ramek DataFrames jest bardzo duża, więc wysłanie jej pochłania dużo czasu.

Można:

  1. Set wyższy spark.sql.broadcastTimeout zwiększyć timeout - spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
  2. persist() oba DataFrames, potem iskra użyje Shuffle Dołącz - odwołanie od here
+0

NIC dziękuję - przeczytaj kilka blogów na temat rozwiązywania problemów iskry dla ciężkich obciążeń i myślę, że będę w stanie to zrobić do jutra. Jedno jednak: czy możesz poprawić swoją odpowiedź, wyjaśniając, w jaki sposób mogę skonfigurować samodzielną aplikację iskrową, aby zmienić "spark.sql.broadcastTimeout"? Innymi słowy, co mogę dodać do tego: 'val conf = new SparkConf() setAppName (APP_NAME)' 'val sc = new SparkContext (conf)' .? –

+0

@ ΧρίστοςΧατζηνικολή Dodałem, jak ustawić wartość :) –