2015-03-13 9 views
6

używam następny kod:Jak mogę scalić iskrzenie wyników plików bez ponownej partycji i copyMerge?

csv.saveAsTextFile(pathToResults, classOf[GzipCodec]) 

pathToResults katalog ma wiele plików, takich jak część-0000, cz-0001 itp mogę używać FileUtil.copyMerge(), ale jest to bardzo powolny, to pobierz wszystkie pliki na program sterownika, a następnie prześlij je w trybie hadoop. Ale FileUtil.copyMerge() szybciej niż:

csv.repartition(1).saveAsTextFile(pathToResults, classOf[GzipCodec]) 

Jak mogę połączyć wyniki zapłonowe plików bez podziale i FileUtil.copyMerge()?

Odpowiedz

8

Niestety, nie ma innej opcji, aby uzyskać pojedynczy plik wyjściowy w Sparku. Zamiast repartition(1) można użyć coalesce(1), ale z parametrem 1 ich zachowanie będzie takie samo. Spark będzie zbierać twoje dane w jednej partycji w pamięci, co może spowodować błąd OOM, jeśli twoje dane są zbyt duże.

Inną opcją scalania plików na HDFS może być napisanie prostego zadania MapReduce (lub zadania Pig, lub zadania Hadoop Streaming), które dostarczyłoby cały katalog jako dane wejściowe i użycie jednego reduktora wygeneruje pojedynczy plik wyjściowy. Należy jednak pamiętać, że w przypadku podejścia MapReduce wszystkie dane zostaną najpierw skopiowane do lokalnego systemu plików reduktora, co może spowodować błąd "braku miejsca".

Oto kilka przydatnych linków na ten sam temat:

0

pełniły dokładnie to samo pytanie i musiał napisać kod pySpark (z wywołaniami Hadoop API), który implementuje copyMerge:

https://github.com/Tagar/stuff/blob/master/copyMerge.py

Niestety copyMerge jako samodzielne wywołanie interfejsu API Hadoop zostanie wycofane i usunięte w Hadoop 3.0. Taka implementacja nie zależy od copyMerge firmy Hadoop (ponownie ją implementuje).