2015-11-10 6 views
7

Chcę uruchomić zadanie iskrzenia (iskra v1.5.1) na niektórych wygenerowanych ścieżkach S3 zawierających pliki avro. Ładuję je:Jak umożliwić iskrowi ignorowanie brakujących plików wejściowych?

val avros = paths.map(p => sqlContext.read.avro(p)) 

Niektóre ścieżki nie istnieją. Jak mogę uzyskać iskrę, aby zignorować te puste ścieżki? Poprzednio używałem this answer, ale nie jestem pewien, jak tego użyć z nowym interfejsem API danych.

Uwaga: Idealnie szukam podobnego podejścia do połączonej odpowiedzi, która sprawia, że ​​ścieżki wejściowe są opcjonalne. Nie chcę szczególnie wyraźnie sprawdzać istnienia ścieżek w S3 (ponieważ jest to uciążliwe i może utrudniać programowanie), ale myślę, że to jest mój powrót, jeśli nie ma żadnego prostego sposobu na zaimplementowanie tego.

Odpowiedz

9

Chciałbym użyć typu scala Try w celu obsługi możliwości awarii podczas odczytu katalogu plików avro. Dzięki "Try" możemy w naszym kodzie jawnie ujawnić możliwość wystąpienia błędu i obsłużyć go w sposób funkcjonalny:

object Main extends App { 

    import scala.util.{Success, Try} 
    import org.apache.spark.{SparkConf, SparkContext} 
    import com.databricks.spark.avro._ 

    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("example")) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

    //the first path exists, the second one doesn't 
    val paths = List("/data/1", "/data/2") 

    //Wrap the attempt to read the paths in a Try, then use collect to filter 
    //and map with a single partial function. 
    val avros = 
    paths 
     .map(p => Try(sqlContext.read.avro(p))) 
     .collect{ 
     case Success(df) => df 
     } 
    //Do whatever you want with your list of dataframes 
    avros.foreach{ df => 
    println(df.collect()) 
    } 
    sc.stop() 
} 
+0

Z dokumentacji iskry: 'collect()' może spowodować, że sterownikowi zabraknie pamięci, jednak, ponieważ 'collect()' pobiera cały RDD do pojedynczego komputera. Czy istnieje rozwiązanie bez użycia 'collect()'? Dotyczy to bardzo dużego zbioru danych. – jbrown

+2

Jest to prawdą, gdy funkcja 'collect()' jest wywoływana na RDD. Gdzie nazywam 'collect (...)' po raz pierwszy, zawierającym funkcję częściową, znajduje się na liście RDD, jest funkcją kolekcjonowania na Liście, a nie na RDD. Jest to równoważne wykonaniu 'map' i' filter'. Używam 'collect()' ponownie na końcu wewnątrz 'foreach' na końcu, ale to tylko przykład działania na liście RDD, nie spodziewam się, że zrobiłbyś to we własnej aplikacji, ale ja Potrzebowałem prostego zakończenia, aby zobaczyć, że podejście zadziałało poprawnie. – mattinbits

+0

Oh OK. Spróbuję i zobaczę, czy to działa. Myślałem, że pierwszym "zbieraniem" była ocena RDD i wysłanie wszystkich danych do węzła sterownika. – jbrown