5

Mam kilka logów serwerowych skompresowanych w S3, i muszę przetworzyć je za pomocą przesyłania strumieniowego na elastycznej MapReduce. Jak mam powiedzieć Amazonowi i Hadoopowi, że dzienniki są już skompresowane (zanim zostaną załadowane do HFS!), Aby można je było zdekompresować przed wysłaniem do skryptu mapowania strumieniowego?Załaduj pliki skompresowane w elastyczny sposób do elastycznej mapyReduce

Jedyną dostępną dokumentacją jest tutaj: http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/HadoopDataCompression.html#emr-using-snappy , i wydaje się, że dotyczy ona kompresji pośredniej, a nie plików skompresowanych po dotarciu do HFS.

BTW, pracuję głównie w Pythonie, więc punkty bonusowe, jeśli masz rozwiązanie w boto!

Odpowiedz

7

odpowiedź brzmi „nie można zrobić.” Przynajmniej nie w konkretnym przypadku stosowania streamingu hadoopów do plików skompresowanych w zgryźliwy sposób, pochodzących z poza hadoop.

Ja (dokładnie!) Przeanalizowałem dwie główne opcje, aby dojść do tego wniosku: (1) spróbuj użyć wbudowanej kompresji hadoopa, jak sugeruje highlikoinated, lub (2) napisz mój własny moduł do strumieniowania, aby skonsumować i rozpakować snappy akta.

W przypadku opcji (1) wygląda na to, że hadoop dodaje kilka znaczników do plików podczas kompresowania ich za pomocą snappy. Ponieważ moje pliki są kompresowane za pomocą zgrabnego poza hadoop, wbudowany kodek hadoopa nie może rozpakować plików.

Jednym z objawów tego problemu był błąd przestrzeń sterty:

2013-04-03 20:14:49,739 FATAL org.apache.hadoop.mapred.Child (main): Error running child : java.lang.OutOfMemoryError: Java heap space 
    at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:102) 
    at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:82) 
    at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:76) 
    at java.io.InputStream.read(InputStream.java:85) 
    ... 

Kiedy przeszedłem do znacznie większej instancji i odpalił ustawienie mapred.child.java.opts, mam nowy błąd:

java.io.IOException: IO error in map input file s3n://my-bucket/my-file.snappy 

Snapowy kodek Hadoop po prostu nie działa z zewnętrznie wygenerowanymi plikami.

W przypadku opcji (2) problem polega na tym, że streaming w trybie hadoop nie rozróżnia podziałów linii \ n, \ r i \ r \ n. Ponieważ kompresja snappy kończy się zraszaniem tych kodów bajtowych w skompresowanych plikach, jest to fatalne. Oto mój ślad błędu:

2013-04-03 22:29:50,194 WARN org.apache.hadoop.mapred.Child (main): Error running child 
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:372) 
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:586) 
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) 
    ... 

Przy odrobinie pracy nad klas Java Hadoop (patrz here, na przykład), prawdopodobnie mógłby naprawić \ r vs \ n problemu. Ale, jak powiedziałem na początku, moim celem było zbudowanie w ramach modułu streamingowego, bez dotykania Javy. Z tym ograniczeniem wydaje się, że nie ma sposobu na rozwiązanie tego problemu.

W końcu wróciłem do gości generujących pliki, które ten zestaw zużywa, i przekonałem ich, by przełączyli się na gzip lub lzo.

PS - Opcja On (2), Grałem z dzieleniem rekordów na różne znaki (np. Textinputformat.record.delimiter = X), ale czułem się bardzo hacky i nie działało tak.

PPS - Innym obejściem byłoby napisanie skryptów do pobrania plików z S3, ich dekompresja, a następnie uruchomienie opcji -copyFromLocal w celu przeniesienia ich do HDFS. Z punktu widzenia komputacji nie ma w tym nic złego, ale z perspektywy przepływu pracy wprowadziłoby to wiele kłopotów.

1

Zakładając, że korzystasz z TextInputFormat (lub jednej z jej podklas), skompresowane pliki wejściowe z rozszerzeniem .snappy są obsługiwane automatycznie.

Być może warto rozważyć użycie kompresji Lzo (rozszerzenie .gz) zamiast snappy. Porzucisz trochę szybkości kompresji, aby uzyskać lepszy współczynnik kompresji i plik wejściowy, który można podzielić na warstwy. Cloudera wspomina ten in their blog:

One thing to note is that Snappy is intended to be used with a container format, like Sequence Files or Avro Data Files, rather than being used directly on plain text, for example, since the latter is not splittable and can’t be processed in parallel using MapReduce. This is different to LZO, where is is possible to index LZO compressed files to determine split points so that LZO files can be processed efficiently in subsequent processing.

+0

Słyszę, co mówisz o LZO versus snappy, a dla innych robiących podobne rzeczy w przyszłości, poleciłbym także LZO. W moim przypadku zespół zarządzający pamięcią masową w S3 ma inne powody, dla których wolałby być bardziej agresywny i nie zaszkodzi zbytnio naszemu występowi w hadoopie. Tak więc trzymamy się szybkiej kompresji. – Abe

+0

Wykrywane przez ciebie magiczne rozszerzenie oparte na rozszerzeniu pliku nie działa na wielu wersjach maku. Używam programu AWS EMR AMI 2.3.3, wersja 1.0.3, i nie działa tam. Próbowałem także kilku innych buildów EMR, ale bez radości. – Abe