2014-10-10 15 views
9

Próbuję użyć Apache Spark SQL do danych logów jls etl w S3 do plików Parquet również na S3. Moje kodu jest zasadniczo:Spark SQL nie może zakończyć pisania danych Parkietu z dużą liczbą odłamków

import org.apache.spark._ 
val sqlContext = sql.SQLContext(sc) 
val data = sqlContext.jsonFile("s3n://...", 10e-6) 
data.saveAsParquetFile("s3n://...") 

Ten kod działa, gdy mam do 2000 partycji i nie do 5000 lub więcej, niezależnie od ilości danych. Normalnie można po prostu zlewają partycje do akceptowalnego numeru, ale jest to bardzo duży zbiór danych i partycji w 2000 uderzę problem opisać w tym question

14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s 
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s 
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ... 
java.io.IOException: Could not read footer: java.lang.NullPointerException 
     at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190) 
     at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203) 
     at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49) 
     at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:319) 
     at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246) 
     at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409) 
     at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409) 
     at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77) 
     at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54) 
     at $line37.$read$$iwC$$iwC$$iwC.<init>(<console>:56) 
     at $line37.$read$$iwC$$iwC.<init>(<console>:58) 
     at $line37.$read$$iwC.<init>(<console>:60) 
     at $line37.$read.<init>(<console>:62) 
     at $line37.$read$.<init>(<console>:66) 
     at $line37.$read$.<clinit>(<console>) 
     at $line37.$eval$.<init>(<console>:7) 
     at $line37.$eval$.<clinit>(<console>) 
     at $line37.$eval.$print(<console>) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) 
     at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) 
     at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) 
     at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) 
     at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) 
     at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) 
     at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) 
     at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) 
     at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) 
     at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) 
     at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
     at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) 
     at org.apache.spark.repl.Main$.main(Main.scala:31) 
     at org.apache.spark.repl.Main.main(Main.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.NullPointerException 
     at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.close(NativeS3FileSystem.java:106) 
     at java.io.BufferedInputStream.close(BufferedInputStream.java:472) 
     at java.io.FilterInputStream.close(FilterInputStream.java:181) 
     at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:298) 
     at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180) 
     at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

Używam tego na zapłonowej-1.1.0 na R3.xlarge w ec2. Używam konsoli iskrowo-powłokowej do uruchomienia powyższego kodu. Jestem w stanie wykonać nietrywialne zapytania na obiekcie SchemaRDD data, więc nie wydaje się, że jest to problem z zasobami. Możliwe jest również odczytanie i zapytanie o wynikowy plik Parkietu, który zajmuje wyjątkowo dużo czasu z powodu braku plików podsumowania.

+1

Chciałbym zgłosić błąd na ten temat. https://issues.apache.org/jira/browse/SPARK/ –

Odpowiedz

0

spróbuj ustawić tę właściwość jako fałsz:

sparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false");