2015-06-26 21 views
5

Kiedy uruchomić kod, takich jak:Spark RDD checkpoint na trwali/buforowane RDD przeprowadzasz DAG dwukrotnie

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER) 
newRDD.checkpoint 
print(newRDD.count()) 

i obserwować etapy przędza, zauważam, że Spark robi obliczenia DAG dwa razy - - raz dla odrębnego + licznika, który zmaterializuje RDD i zapisuje go w pamięci podręcznej, a następnie całkowicie DRUGI czas na utworzenie sprawdzonej kopii.

Ponieważ RDD jest już zmaterializowany i zapisany w pamięci podręcznej, dlaczego punkt kontrolny nie wykorzystuje tego po prostu i zapisuje buforowane partycje na dysku?

Czy istnieje sposób (jakieś ustawienie konfiguracji lub zmiana kodu), aby zmusić Sparka do skorzystania z tego i tylko uruchomić operację RAZ, a punkty kontrolne po prostu skopiują rzeczy?

Czy zamiast tego muszę "zmaterializować"?

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER) 
print(newRDD.count()) 

newRDD.checkpoint 
print(newRDD.count()) 

Utworzyliśmy Apache Spark Jira bilet, aby ten wniosek cecha: https://issues.apache.org/jira/browse/SPARK-8666

Odpowiedz