2017-01-18 42 views
6

Jak mogę dodać lub zamienić pola w strukturze na dowolnym poziomie zagnieżdżonym?Dodawanie kolumny zagnieżdżonej do Spark DataFrame

Wejście:

val rdd = sc.parallelize(Seq(
    """{"a": {"xX": 1,"XX": 2},"b": {"z": 0}}""", 
    """{"a": {"xX": 3},"b": {"z": 0}}""", 
    """{"a": {"XX": 3},"b": {"z": 0}}""", 
    """{"a": {"xx": 4},"b": {"z": 0}}""")) 
var df = sqlContext.read.json(rdd) 

dała następujący schemat:

root 
|-- a: struct (nullable = true) 
| |-- XX: long (nullable = true) 
| |-- xX: long (nullable = true) 
| |-- xx: long (nullable = true) 
|-- b: struct (nullable = true) 
| |-- z: long (nullable = true) 

Wtedy mogę to zrobić:

import org.apache.spark.sql.functions._ 
val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX")) 
df = df 
    .withColumn("a_xx", 
    coalesce(overlappingNames:_*)) 
    .dropNestedColumn("a.xX") 
    .dropNestedColumn("a.XX") 
    .dropNestedColumn("a.xx") 

(dropNestedColumn jest wypożyczony z tej odpowiedzi: https://stackoverflow.com/a/39943812/1068385. Po prostu szukam odwrotnej operacji na tego)

a schemat staje.

root 
|-- a: struct (nullable = false) 
|-- b: struct (nullable = true) 
| |-- z: long (nullable = true) 
|-- a_xx: long (nullable = true) 

Oczywiście nie zastąpi (lub dodać) a.xx ale zamiast tego dodaje nowe pole a_xx na poziomie głównym.

Chciałbym móc to zrobić w zamian:

val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX")) 
df = df 
    .withNestedColumn("a.xx", 
    coalesce(overlappingNames:_*)) 
    .dropNestedColumn("a.xX") 
    .dropNestedColumn("a.XX") 

Tak, że doprowadziłoby to do tego schematu:

root 
|-- a: struct (nullable = false) 
| |-- xx: long (nullable = true) 
|-- b: struct (nullable = true) 
| |-- z: long (nullable = true) 

Jak mogę to osiągnąć?

Praktyczny cel w tym przypadku ma być niewrażliwy na wielkość liter z nazwami kolumn w wejściowym JSON. Ostatni krok będzie prosty: zbierz wszystkie nakładające się nazwy kolumn i zastosuj koalescencję na każdym z nich.

+0

wziąłeś rozwiązanie? –

+0

@ShankarKoirala: nie ze Spark. W Hive było trywialnie używać COALESCE do osiągnięcia tego, co chciałem. – Arvidaa

Odpowiedz

1

To nie może być tak elegancki lub równie skuteczne jak to może być, ale tutaj jest to, co wymyśliłem:

object DataFrameUtils { 
    private def nullableCol(parentCol: Column, c: Column): Column = { 
    when(parentCol.isNotNull, c) 
    } 

    private def nullableCol(c: Column): Column = { 
    nullableCol(c, c) 
    } 

    private def createNestedStructs(splitted: Seq[String], newCol: Column): Column = { 
    splitted 
     .foldRight(newCol) { 
     case (colName, nestedStruct) => nullableCol(struct(nestedStruct as colName)) 
     } 
    } 

    private def recursiveAddNestedColumn(splitted: Seq[String], col: Column, colType: DataType, nullable: Boolean, newCol: Column): Column = { 
    colType match { 
     case colType: StructType if splitted.nonEmpty => { 
     var modifiedFields: Seq[(String, Column)] = colType.fields 
      .map(f => { 
      var curCol = col.getField(f.name) 
      if (f.name == splitted.head) { 
       curCol = recursiveAddNestedColumn(splitted.tail, curCol, f.dataType, f.nullable, newCol) 
      } 
      (f.name, curCol as f.name) 
      }) 

     if (!modifiedFields.exists(_._1 == splitted.head)) { 
      modifiedFields :+= (splitted.head, nullableCol(col, createNestedStructs(splitted.tail, newCol)) as splitted.head) 
     } 

     var modifiedStruct: Column = struct(modifiedFields.map(_._2): _*) 
     if (nullable) { 
      modifiedStruct = nullableCol(col, modifiedStruct) 
     } 
     modifiedStruct 
     } 
     case _ => createNestedStructs(splitted, newCol) 
    } 
    } 

    private def addNestedColumn(df: DataFrame, newColName: String, newCol: Column): DataFrame = { 
    if (newColName.contains('.')) { 
     var splitted = newColName.split('.') 

     val modifiedOrAdded: (String, Column) = df.schema.fields 
     .find(_.name == splitted.head) 
     .map(f => (f.name, recursiveAddNestedColumn(splitted.tail, col(f.name), f.dataType, f.nullable, newCol))) 
     .getOrElse { 
      (splitted.head, createNestedStructs(splitted.tail, newCol) as splitted.head) 
     } 

     df.withColumn(modifiedOrAdded._1, modifiedOrAdded._2) 

    } else { 
     // Top level addition, use spark method as-is 
     df.withColumn(newColName, newCol) 
    } 
    } 

    implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { 
    /** 
     * Add nested field to DataFrame 
     * 
     * @param newColName Dot-separated nested field name 
     * @param newCol New column value 
     */ 
    def withNestedColumn(newColName: String, newCol: Column): DataFrame = { 
     DataFrameUtils.addNestedColumn(df, newColName, newCol) 
    } 
    } 
} 

Zapraszam do poprawy na nim.

val data = spark.sparkContext.parallelize(List("""{ "a1": 1, "a3": { "b1": 3, "b2": { "c1": 5, "c2": 6 } } }""")) 
val df: DataFrame = spark.read.json(data) 

val df2 = df.withNestedColumn("a3.b2.c3.d1", $"a3.b2") 

powinna produkować:

assertResult("struct<a1:bigint,a3:struct<b1:bigint,b2:struct<c1:bigint,c2:bigint,c3:struct<d1:struct<c1:bigint,c2:bigint>>>>>")(df2.shema.simpleString) 
+0

Dzięki. Sprawdzę to w przyszłym tygodniu i jeśli działa, oznacz jako zaakceptowaną odpowiedź. – Arvidaa

+0

@Michel Lemay Działa dobrze w przypadku w pytaniu. Dzięki. Próbuję zastosować go do zagnieżdżonej tablicy struktur i nie działa, jest to trochę za daleko dla mojej rzeczywistej wiedzy o iskrze ... Czy możesz mi pomóc? – Gab

+0

Rzeczywiście, nie jest to funkcja, której potrzebujemy, więc zostawiłem ją dla przyszłych ulepszeń. Aby obsłużyć to przy użyciu obecnego kodu, należałoby zmodyfikować tablice 'case _' i wsparcie dla zagnieżdżonych struktur. Zagnieżdżone typy proste również muszą być promowane do struktury. Musimy również obsługiwać tablice w newCol i radzić sobie z możliwie różną liczbą elementów w tablicy docelowej. –