Próbuję ustawić spark.sql.parquet.output.committer.class i nic, co robię, nie wydaje się, aby ustawienie zadziałało.Jak ustawić spark.sql.parquet.output.committer.class w pyspark
Próbuję mieć wiele wątków zapisu do tego samego folderu wyjściowego, który będzie działać z org.apache.spark.sql. parquet.DirectParquetOutputCommitter
, ponieważ nie będzie używać folderu _temporary
. Dostaję następujący błąd, który jak wiem, że to nie działa:
Caused by: java.io.FileNotFoundException: File hdfs://path/to/stuff/_temporary/0/task_201606281757_0048_m_000029/some_dir does not exist.
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:795)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:106)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:853)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:849)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:849)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:382)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
Uwaga wywołanie org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob
klasa domyślna.
Próbowałem następujące, w oparciu o inne tak odpowiedzi i zapytania:
sc._jsc.hadoopConfiguration().set(key, val)
(to działa na ustawieniach takich jakparquet.enable.summary-metadata
)dataframe.write.option(key, val).parquet
- Dodajesz
--conf "spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter"
na wezwaniespark-submit
- Dodawanie
--conf "spark.sql.parquet.output.committer.class"=" org.apache.spark.sql.parquet.DirectParquetOutputCommitter"
do połączeniaspark-submit
.
To wszystko, co udało mi się znaleźć, i nic nie działa. Wygląda na to, że nie jest to trudne dla set in Scala, ale wydaje się niemożliwe w Pythonie.
Problem Github: https://github.com/apache/spark/pull/12229 – ksindi