2016-11-07 22 views
9

pracuję nad programem Spark SQL, a ja dostaję następujący wyjątek:Jakie są możliwe przyczyny otrzymania TimeoutException: Futures timed out po [n sekund] podczas pracy z Spark

16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds] 
java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds] 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) 
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
    at scala.concurrent.Await$.result(package.scala:190) 
    at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144) 
    at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) 
    at scala.collection.immutable.List.map(List.scala:285) 
    at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:129) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:118) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41) 
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93) 
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60) 
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84) 
    at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1581) 
    at org.apache.spark.sql.DataFrame.cache(DataFrame.scala:1590) 
    at com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56) 
    at com.somecompany.ml.modeling.NewModel.generateArtifacts(FlowForNewModel.scala:32) 
    at com.somecompany.ml.modeling.Flow$class.run(Flow.scala:52) 
    at com.somecompany.ml.modeling.lowForNewModel.run(FlowForNewModel.scala:15) 
    at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54) 
    at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54) 
    at scala.Option.getOrElse(Option.scala:121) 
    at com.somecompany.ml.Main$.main(Main.scala:46) 
    at com.somecompany.ml.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) 
16/11/07 15:58:25 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]) 

Ostatni część mojego kodu, który rozpoznaję ze śledzenia stosu, to com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56), która doprowadza mnie do tej linii: profilesDF.cache() Przed buforowaniem wykonuję połączenie między dwiema ramkami danych. Widziałem odpowiedź na temat utrzymywania obu ramek danych przed dołączeniem here Nadal muszę buforować ujednoliconą ramkę danych, ponieważ używam go w kilku moich transformacjach.

I zastanawiałem się, co może spowodować wyrzucenie tego wyjątku ? Poszukując go, otworzyłem link do wyjątku rpc timeout lub problemów z bezpieczeństwem, który nie jest moim problemem. Jeśli masz pojęcie, jak go rozwiązać, oczywiście to doceniam, ale nawet zrozumienie problemu pomoże mi rozwiązać go

góry dzięki

+0

Hej, w końcu to wymyśliłeś?Próbowałem zwiększyć limit czasu sieci Spark, ale nie rozwiązało problemu. Zaczynam myśleć, że muszę się rozdzielić po zjednoczeniu dwóch moich RDD, ale nie miałem okazji jeszcze tego spróbować. Mój program działa z małymi zestawami danych, ale zrywa, gdy zacznę używać więcej. – frosty

+0

@frosty zapoznaj się z moją odpowiedzią poniżej. To może pomóc – mathieu

Odpowiedz

10

Pytanie: zastanawiałem się, co może być przyczyną tego wyjątku zostać wyrzucony?

Odpowiedź:

spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins

spark.network.timeout 120s Domyślny limit czasu dla wszystkich interakcji sieciowych .. spark.network.timeout (spark.rpc.askTimeout), spark.sql.broadcastTimeout, spark.kryoserializer.buffer.max (jeśli używasz Kryo serializacji), itd. Są dostrojone z wartości większe niż domyślne w w celu obsługi złożonych zapytań. Możesz zacząć od tych wartości i dostosuj odpowiednio do swoich obciążeń SQL.

Uwaga: Doc says that

następujące opcje (. Zobaczyć spark.sql właściwości) mogą być również wykorzystywane do strojenia wydajności realizacji zapytań. Jest możliwe, że te opcje zostaną wycofane w przyszłej wersji, ponieważ więcej optymalizacji zostanie przeprowadzonych automatycznie. *

Ponadto, dla lepszego zrozumienia, można zobaczyć BroadCastHashJoin, gdzie metoda wykonywania jest punktem wyzwalającym dla powyższego śledzenia stosu.

protected override def doExecute(): RDD[Row] = { 
    val broadcastRelation = Await.result(broadcastFuture, timeout) 

    streamedPlan.execute().mapPartitions { streamedIter => 
     hashJoin(streamedIter, broadcastRelation.value) 
    } 
    } 
0

Dobrze wiedzieć, że sugestia z Ram działa w niektórych przypadkach. Chciałbym wspomnieć, że kilka razy natknąłem się na ten wyjątek (w tym ten opisany here).

Znaczna część czasu była spowodowana niemal bezgłośnymi OOM na niektórych executorach. Sprawdź SparkUI pod kątem nieudanych zadań, ostatnia kolumna tej tabeli: task panel for a stage in SparkUI Możesz zauważyć komunikaty OOM.

Jeśli dobrze zrozumiesz iskrzenie wewnętrzne, transmitowane dane przechodzą przez sterownik. Sterownik ma więc mechanizm wątku, który zbiera dane z executorów i odsyła je do wszystkich. Jeśli w jakimś momencie executor ulegnie awarii, możesz skończyć z tymi limitami czasu.