2015-09-17 4 views
6

Chcę zapisać (jako plik z parkietu) element Spark DataFrame zawierający niestandardową klasę jako kolumnę. Ta klasa składa się z Seq innej niestandardowej klasy. Aby to zrobić, tworzę klasę UserDefinedType dla każdej z tych klas, w sposób podobny do VectorUDT. Mogę pracować z ramką danych zgodnie z zamierzeniami, ale nie mogę zapisać jej na dysku jako parkiet (lub jason). Zgłosiłem to jako błąd, ale być może jest problem z moim kodem. I zostały wdrożone prostszy przykład, aby pokazać problem:Zapisywanie danych typu Spark DataFrame z zagnieżdżonymi typami danych użytkownika

import org.apache.spark.sql.SaveMode 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.catalyst.InternalRow 
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow 
import org.apache.spark.sql.types._ 

@SQLUserDefinedType(udt = classOf[AUDT]) 
case class A(list:Seq[B]) 

class AUDT extends UserDefinedType[A] { 
    override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true))) 
    override def userClass: Class[A] = classOf[A] 
    override def serialize(obj: Any): Any = obj match { 
    case A(list) => 
     val row = new GenericMutableRow(1) 
     row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) 
     row 
    } 

    override def deserialize(datum: Any): A = { 
    datum match { 
     case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) 
    } 
    } 
} 

object AUDT extends AUDT 

@SQLUserDefinedType(udt = classOf[BUDT]) 
case class B(num:Int) 

class BUDT extends UserDefinedType[B] { 
    override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false))) 
    override def userClass: Class[B] = classOf[B] 
    override def serialize(obj: Any): Any = obj match { 
    case B(num) => 
     val row = new GenericMutableRow(1) 
     row.setInt(0, num) 
     row 
    } 

    override def deserialize(datum: Any): B = { 
    datum match { 
     case row: InternalRow => new B(row.getInt(0)) 
    } 
    } 
} 

object BUDT extends BUDT 

object TestNested { 
    def main(args:Array[String]) = { 
    val col = Seq(new A(Seq(new B(1), new B(2))), 
        new A(Seq(new B(3), new B(4)))) 

    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark")) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    val df = sc.parallelize(1 to 2 zip col).toDF() 
    df.show() 

    df.write.mode(SaveMode.Overwrite).save(...) 
    } 
} 

Wynika to w następujący błąd:

15/09/16 16:44:39 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: Nested type should be repeated: required group array { required int32 num; } at org.apache.parquet.schema.ConversionPatterns.listWrapper(ConversionPatterns.java:42) at org.apache.parquet.schema.ConversionPatterns.listType(ConversionPatterns.java:97) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:460) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:522) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:521) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:521) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:526) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:92) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.sql.types.StructType.map(StructType.scala:92) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convert(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypesConverter.scala:58) at org.apache.spark.sql.execution.datasources.parquet.RowWriteSupport.init(ParquetTableSupport.scala:55) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:272) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:234) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 16:44:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost):

Jeżeli zapisać dataframe z B zamiast bez problemu istnieje od B jako nie zagnieżdżona klasa niestandardowa. Czy czegoś brakuje?

Odpowiedz

2

Musiałem wprowadzić cztery zmiany w kodzie, aby działało (testowane w Spark 1.6.0 na Linuksie) i myślę, że mogę głównie wyjaśnić, dlaczego są one potrzebne. Zastanawiam się jednak, czy istnieje prostsze rozwiązanie. Wszystkie zmiany są w AUDT, w następujący sposób:

  1. Przy definiowaniu sqlType, sprawiają, że zależą BUDT.sqlType, raczej niż tylko BUDT.
  2. W serialize(), wywołaj BUDT.serialize() na każdym elemencie listy.
  3. W deserialize():
    • wezwanie toArray(BUDT.sqlType) zamiast toArray(BUDT)
    • wezwanie BUDT.deserialize() na każdym elemencie

Oto otrzymany kod:

class AUDT extends UserDefinedType[A] { 
    override def sqlType: DataType = 
    StructType(
     Seq(StructField("list", 
         ArrayType(BUDT.sqlType, containsNull = false), 
         nullable = true))) 

    override def userClass: Class[A] = classOf[A] 

    override def serialize(obj: Any): Any = 
    obj match { 
     case A(list) => 
     val row = new GenericMutableRow(1) 
     val elements = 
      list.map(_.asInstanceOf[Any]) 
       .map(e => BUDT.serialize(e)) 
       .toArray 
     row.update(0, new GenericArrayData(elements)) 
     row 
    } 

    override def deserialize(datum: Any): A = { 
    datum match { 
     case row: InternalRow => 
     val first = row.getArray(0) 
     val bs:Array[InternalRow] = first.toArray(BUDT.sqlType) 
     val bseq = bs.toSeq.map(e => BUDT.deserialize(e)) 
     val a = new A(bseq) 
     a 
    } 
    } 

} 

Wszystkie cztery ch anges mają ten sam charakter: związek między obsługą A s a obsługą B s jest teraz bardzo wyraźny: w przypadku pisania schematów, serializacji i dekrealizacji. Oryginalny kod wydaje się być oparty na założeniu, że Spark SQL "po prostu to wymyśli", co może być rozsądne, ale najwyraźniej tak nie jest.

+0

To działało. Używam "złożonych" obiektów jako kolumn w DataFrame, aw Spark 1.6.0 przestał działać. Zrobiło to sztuczkę, więc lekcja, której się nauczyłem, sprawia, że ​​wszystko, co dotyczy serializacji/de-serializacji, jest bardzo wyraźne. Twoje zdrowie! –