2016-10-16 46 views
6

Mam RDD, którego elementy są typu (Long, String). Z jakiegoś powodu chcę zapisać cały RDD w HDFS, a później także przeczytać RDD z powrotem w programie Spark. czy jest to możliwe do zrobienia? A jeśli tak, to w jaki sposób?Jak zapisać RDD w HDFS, a potem go odczytać?

Odpowiedz

5

Jest to możliwe.

W RDD masz funkcje saveAsObjectFile i saveAsTextFile. Krotki są zapisywane jako (value1, value2), dzięki czemu można je później parsować.

Reading można zrobić z textFile funkcji z SparkContext a następnie .map wyeliminować ()

Więc: Wersja 1:

rdd.saveAsTextFile ("hdfs:///test1/"); 
// later, in other program 
val newRdds = sparkContext.textFile("hdfs:///test1/part-*").map (x => { 
    // here remove() and parse long/strings 
}) 

Wersja 2:

rdd.saveAsObjectFile ("hdfs:///test1/"); 
// later, in other program - watch, you have tuples out of the box :) 
val newRdds = sparkContext.sc.sequenceFile("hdfs:///test1/part-*", classOf[Long], classOf[String]) 
+0

ślubować, że jest miłe rozwiązanie :). Ale jak odczytujemy za pomocą textFile, ponieważ saveAsText tworzyłoby wiele różnych plików. – pythonic

+0

@pythonic Zobacz moją aktualizację - możesz odczytać zakres pliku. Każda część RDD zapisywana jest w pliku 'part-XYZŹŻ', dzięki czemu możemy odczytać każdy plik o takiej nazwie –

3

polecam użyj DataFrame, jeśli Twój RDD ma format tabelaryczny. Ramka danych to tabela lub dwuwymiarowa struktura tablicowa, w której każda kolumna zawiera pomiary jednej zmiennej, a każdy wiersz zawiera jeden przypadek. a DataFrame ma dodatkowe metadane ze względu na format tabelaryczny, który pozwala Spark na uruchomienie pewnych optymalizacji w sfinalizowanym zapytaniu. gdzie RDD jest odpornym, rozproszonym zestawem danych, który jest bardziej blackboxem lub rdzeniem abstrakcji danych, których nie można zoptymalizować. Można jednak przejść z DataFrame do RDD i odwrotnie, a można przejść z RDD do DataFrame (jeśli RDD jest w formacie tabelarycznym) za pomocą metody toDF.

Poniżej znajduje się przykład utworzyć/zapisać DataFrame w formacie CSV i parkiet HDFS,

val conf = { 
    new SparkConf() 
    .setAppName("Spark-HDFS-Read-Write") 
} 

val sqlContext = new SQLContext(sc) 

val sc = new SparkContext(conf) 

val hdfs = "hdfs:///" 
val df = Seq((1, "Name1")).toDF("id", "name") 

// Writing file in CSV format 
df.write.format("com.databricks.spark.csv").mode("overwrite").save(hdfs + "user/hdfs/employee/details.csv") 

// Writing file in PARQUET format 
df.write.format("parquet").mode("overwrite").save(hdfs + "user/hdfs/employee/details") 

// Reading CSV files from HDFS 
val dfIncsv = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(hdfs + "user/hdfs/employee/details.csv") 

// Reading PQRQUET files from HDFS 
val dfInParquet = sqlContext.read.parquet(hdfs + "user/hdfs/employee/details")