2016-04-13 46 views
7

Mam następujący problem: załóżmy, że mam katalog zawierający skompresowane katalogi, które zawierają wiele plików, przechowywanych na HDFS. Chcę utworzyć RDD składają kilka obiektów typu T, tj:Czytaj całe pliki tekstowe z kompresji w Spark

context = new JavaSparkContext(conf); 
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath); 

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath); 
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> { 
    // The name of the file 
    String fileName = fileNameContent._1(); 
    // The content of the file 
    String content = fileNameContent._2(); 

    // Class T has a constructor of taking the filename and the content of each 
    // processed file (as two strings) 
    T t = new T(content, fileName); 

    return t; 
}); 

Teraz kiedy inputDataPath jest katalogiem zawierającym pliki to działa perfekcyjnie, to znaczy, kiedy to coś jak:

String inputDataPath = "hdfs://some_path/*/*/"; // because it contains subfolders 

ale gdy jest plik zawierający wiele plików, zawartość pliku (fileNameContent._2()) dostarcza mi trochę niepotrzebnego ciągu binarnego (całkiem spodziewany). Znalazłem similar question on SO, ale nie jest to ten sam przypadek, ponieważ istnieje rozwiązanie, gdy każda kompresja składa się tylko z jednego pliku, aw moim przypadku istnieje wiele innych plików, które chcę odczytywać pojedynczo jako całe pliki. Znalazłem także question o wholeTextFiles, ale to nie działa w moim przypadku.

Jakieś pomysły, jak to zrobić?

EDIT:

Próbowałem z czytnikiem z here (próbuje przetestować czytelnika z here, podobnie jak w funkcji testTarballWithFolders()), ale gdy zgłoszę

TarballReader tarballReader = new TarballReader(fileName); 

i mam NullPointerException :

java.lang.NullPointerException 
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83) 
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77) 
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91) 
    at utils.TarballReader.<init>(TarballReader.java:61) 
    at main.SparkMain.lambda$0(SparkMain.java:105) 
    at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source) 
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Linia 105 w MainSpark jest jeden Pokazałem górna w mojej edycji posta, a linia 61 z TarballReader jest

GZIPInputStream gzip = new GZIPInputStream(in); 

co daje wartość null dla strumienia wejściowego in w górnym wierszu:

InputStream in = this.getClass().getResourceAsStream(tarball); 

Am I na właściwej ścieżce tutaj? Jeśli tak, jak mam kontynuować? Dlaczego otrzymuję tę wartość null i jak mogę to naprawić?

+0

nie u zobacz: http://stackoverflow.com/questions/24402737/how-to-read-gz-files-in-spark-using-wholetextfiles –

+0

Tak, napisałem na moje pytanie - to nie działa w moim przypadku. W 'ścieżkach' ponownie uzyskuję ścieżkę kompresji, która jest bezużyteczna. – Belphegor

+0

To jest duplikat "Co to jest wyjątek typu Null Pointer i jak tego uniknąć" –

Odpowiedz

17

Jednym z możliwych rozwiązań jest ręczne odczytanie danych za pomocą binaryFiles.

Scala:

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream 
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream 
import org.apache.spark.input.PortableDataStream 
import scala.util.Try 
import java.nio.charset._ 

def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try { 
    val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open)) 
    Stream.continually(Option(tar.getNextTarEntry)) 
    // Read until next exntry is null 
    .takeWhile(_.isDefined) 
    // flatten 
    .flatMap(x => x) 
    // Drop directories 
    .filter(!_.isDirectory) 
    .map(e => { 
     Stream.continually { 
     // Read n bytes 
     val buffer = Array.fill[Byte](n)(-1) 
     val i = tar.read(buffer, 0, n) 
     (i, buffer.take(i))} 
     // Take as long as we've read something 
     .takeWhile(_._1 > 0) 
     .map(_._2) 
     .flatten 
     .toArray}) 
    .toArray 
} 

def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = 
    new String(bytes, StandardCharsets.UTF_8) 

sc.binaryFiles("somePath").flatMapValues(x => 
    extractFiles(x).toOption).mapValues(_.map(decode())) 
libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11" 

Pełny przykład użycia z Java: https://bitbucket.org/zero323/spark-multifile-targz-extract/src

Python:

import tarfile 
from io import BytesIO 

def extractFiles(bytes): 
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz") 
    return [tar.extractfile(x).read() for x in tar if x.isfile()] 

(sc.binaryFiles("somePath") 
    .mapValues(extractFiles) 
    .mapValues(lambda xs: [x.decode("utf-8") for x in xs])) 
+0

Dziękuję za odpowiedź, ale pracuję z Javą (zobacz tagi + kod). Czy mógłbyś podać odpowiedź w Javie? W rzeczywistości nie mogę go przetestować i sprawdzić, czy działa (reszta mojego kodu jest w Javie). – Belphegor

+0

@Belphegor To już jest kod zgodny z Java. Pełny przykład pracy https://bitbucket.org/zero323/spark-multifile-targz-extract – zero323

+0

Arghhh ... Nie wiedziałem o 'binaryFiles' (jakoś ominąłem to z twojej odpowiedzi). W każdym razie, ten kierunek działa, wprowadziłem kilka poprawek do mojego oryginalnego kodu i działa dobrze. Dzięki! – Belphegor