EksperymentSpark SQL: Dlaczego dwa zadania dla jednego zapytania?
Próbowałem następujący fragment na Spark 1.6.1
.
val soDF = sqlContext.read.parquet("/batchPoC/saleOrder") # This has 45 files
soDF.registerTempTable("so")
sqlContext.sql("select dpHour, count(*) as cnt from so group by dpHour order by cnt").write.parquet("/out/")
Physical Plan
jest:
== Physical Plan ==
Sort [cnt#59L ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(cnt#59L ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Final,isDistinct=false)], output=[dpHour#38,cnt#59L])
+- TungstenExchange hashpartitioning(dpHour#38,200), None
+- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Partial,isDistinct=false)], output=[dpHour#38,count#63L])
+- Scan ParquetRelation[dpHour#38] InputPaths: hdfs://hdfsNode:8020/batchPoC/saleOrder
dla tego zapytania, mam dwa zadania: Job 9
i Job 10
Dla Job 9
The DAG
jest:
Dla Job 10
The DAG
jest:
Obserwacje
- Podobno istnieją dwa
jobs
dla jednego zapytania. Stage-16
(oznaczone jakoStage-14
wJob 9
) jest pomijane wJob 10
.Stage-15
's ostatniRDD[48]
, jest taki sam jakStage-17
z ostatnichRDD[49]
. How? Widziałem w dziennikach, że poStage-15
egzekucji,RDD[48]
jest zarejestrowany jakoRDD[49]
Stage-17
jest pokazane wdriver-logs
ale nigdy nie zrealizowanych naExecutors
. Nadriver-logs
pokazano wykonanie zadania, ale gdy przyjrzałem się dziennikom konteneraYarn
, nie było żadnych dowodów na to, że otrzymanotask
zStage-17
.
Logi wspierające te obserwacje (tylko driver-logs
, Zgubiłem executor
logi z powodu późniejszej awarii). Okazuje się, że przed rozpoczęciem Stage-17
, RDD[49]
jest zarejestrowany:
16/06/10 22:11:22 INFO TaskSetManager: Finished task 196.0 in stage 15.0 (TID 1121) in 21 ms on slave-1 (199/200)
16/06/10 22:11:22 INFO TaskSetManager: Finished task 198.0 in stage 15.0 (TID 1123) in 20 ms on slave-1 (200/200)
16/06/10 22:11:22 INFO YarnScheduler: Removed TaskSet 15.0, whose tasks have all completed, from pool
16/06/10 22:11:22 INFO DAGScheduler: ResultStage 15 (parquet at <console>:26) finished in 0.505 s
16/06/10 22:11:22 INFO DAGScheduler: Job 9 finished: parquet at <console>:26, took 5.054011 s
16/06/10 22:11:22 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO SparkContext: Starting job: parquet at <console>:26
16/06/10 22:11:22 INFO DAGScheduler: Registering RDD 49 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Got job 10 (parquet at <console>:26) with 25 output partitions
16/06/10 22:11:22 INFO DAGScheduler: Final stage: ResultStage 18 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Submitting ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26), which has no missing parents
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 17.4 KB, free 512.3 KB)
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 8.9 KB, free 521.2 KB)
16/06/10 22:11:22 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 172.16.20.57:44944 (size: 8.9 KB, free: 517.3 MB)
16/06/10 22:11:22 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006
16/06/10 22:11:22 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26)
16/06/10 22:11:22 INFO YarnScheduler: Adding task set 17.0 with 200 tasks
16/06/10 22:11:23 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 1125, slave-1, partition 0,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 1.0 in stage 17.0 (TID 1126, slave-2, partition 1,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 2.0 in stage 17.0 (TID 1127, slave-1, partition 2,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 3.0 in stage 17.0 (TID 1128, slave-2, partition 3,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 4.0 in stage 17.0 (TID 1129, slave-1, partition 4,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 5.0 in stage 17.0 (TID 1130, slave-2, partition 5,NODE_LOCAL, 1988 bytes)
pytania
- Dlaczego dwa
Jobs
? Jaki jest zamysł, przełamującDAG
na dwajobs
? Job 10
'sDAG
wygląda kompletny do wykonania kwerendy. Czy robi coś konkretnegoJob 9
?- Dlaczego
Stage-17
nie jest pomijany? Wygląda na to, że tworzone są manekinytasks
, czy mają jakiś cel. Później próbowałem jeszcze prostszego zapytania.Niespodziewanie tworzył on 3
Jobs
.sqlContext.sql ("wybierz dpHour z tak aby przez dphour"). Write.parquet ("/ OUT2 /")
To ciekawe, dlaczego drugie etapy pracy nie mogą znaleźć się w pierwszej pracy? –
Dobre pytanie. Może to mieć związek z generowaniem wyniku pośredniego. Ważne pytanie brzmi: dlaczego ma znaczenie, w jaki sposób DAG jest mapowane na etapy i miejsca pracy? – Sim
Tak, trudno jest naprawdę zrozumieć, w jaki sposób Spark to robi, połączenie dostępnych zasobów, danych .... –