2013-06-21 30 views
22

związane z moją inne pytanie, ale wyraźny:Spark trybie autonomicznym: Jak kompresować wyjście zapłonową pisemnej do HDFS

someMap.saveAsTextFile("hdfs://HOST:PORT/out") 

Gdybym zapisać RDD do HDFS, jak mogę powiedzieć iskrę do kompresji wyjście z gzip ? W Hadoop, możliwe jest ustawienie

mapred.output.compress = true 

i wybrać algorytm kompresji z

mapred.output.compression.codec = <<classname of compression codec>> 

Jak zrobić to w iskry? Czy to też zadziała?

edit: using iskra-0.7.2

Odpowiedz

20

Sposób saveAsTextFile trwa dodatkowy opcjonalny parametr klasy kodeka aby wykorzystać. Tak dla przykładu powinno być coś takiego w użyciu gzip:

someMap.saveAsTextFile("hdfs://HOST:PORT/out", classOf[GzipCodec]) 

UPDATE

Ponieważ używasz 0.7.2 może być w stanie do portu kod kompresji poprzez opcje konfiguracyjne, które ustawić przy starcie. Nie jestem pewien, czy to będzie działać dokładnie, ale trzeba iść z tego:

conf.setCompressMapOutput(true) 
conf.set("mapred.output.compress", "true") 
conf.setMapOutputCompressorClass(c) 
conf.set("mapred.output.compression.codec", c.getCanonicalName) 
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) 

do czegoś takiego:

System.setProperty("spark.hadoop.mapred.output.compress", "true") 
System.setProperty("spark.hadoop.mapred.output.compression.codec", "true") 
System.setProperty("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") 
System.setProperty("spark.hadoop.mapred.output.compression.type", "BLOCK") 

Jeśli zmusić go do pracy, publikując swój config prawdopodobnie być także pomocnym dla innych.

+0

z której wersji iskry to działa? Używam zapłonie 0.7. 2 i otrzymuję błąd w czasie kompilacji: 'error: zbyt wiele argumentów dla metody saveAsTextFile'. Widziałem, że to było [omawiane] (https://github.com/mesos/spark/pull/645). – ptikobj

+1

Widzę, że jest w najnowszej wersji iskry 0.8.0. Będzie musiał ją wyciągnąć, jak się wydaje, ponieważ jest to dość ważna funkcja. – ptikobj

+0

ah, to ma sens. Pracowałem z oddziałem głównym, a nie 0.7.2. – Noah

2

Innym sposobem na zapisanie skompresowanych plików gzip do systemu plików HDFS lub Amazon S3 jest użycie metody saveAsHadoopFile.

someMap jest RDD [(K v)], jeśli someMap jak RDD [V] można nazwać someMap.map (wiersz => (linia "") do zastosowania metody saveAsHadoopFile.

import org.apache.hadoop.io.compress.GzipCodec 

someMap.saveAsHadoopFile(output_folder_path, classOf[String], classOf[String], classOf[MultipleTextOutputFormat[String, String]], classOf[GzipCodec]) 
+0

Czy można ustawić parametry w podobny sposób w 'spark-defaults.xml' zamiast tego, aby każde zadanie mogło z niego korzystać? Próbowałem replikacji ustawień do 'spark-defaults.xml', ale ustawienia wydają się nie zostać pobrane. – nikk

1

Dla nowszej wersji Spark, należy wykonać następujące czynności w pliku o zapłonie defaults.xml. (mapred jest derecated).

<property> 
    <name>mapreduce.output.fileoutputformat.compress</name> 
    <value>true</value> 
</property> 
<property> 
    <name>mapreduce.output.fileoutputformat.compress.codec</name> 
    <value>GzipCodec</value> 
</property> 
<property> 
    <name>mapreduce.output.fileoutputformat.compress.type</name> 
    <value>BLOCK</value> 
</property>