11

Pracuję poprzez Databricks example. Schemat dla dataframe wygląda następująco:Rozbijanie zagnieżdżonych struktur w ramce danych Spark

> parquetDF.printSchema 
root 
|-- department: struct (nullable = true) 
| |-- id: string (nullable = true) 
| |-- name: string (nullable = true) 
|-- employees: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- firstName: string (nullable = true) 
| | |-- lastName: string (nullable = true) 
| | |-- email: string (nullable = true) 
| | |-- salary: integer (nullable = true) 

Na przykład, pokazują jak eksplodować kolumnę pracowników do 4 dodatkowych kolumn:

val explodeDF = parquetDF.explode($"employees") { 
case Row(employee: Seq[Row]) => employee.map{ employee => 
    val firstName = employee(0).asInstanceOf[String] 
    val lastName = employee(1).asInstanceOf[String] 
    val email = employee(2).asInstanceOf[String] 
    val salary = employee(3).asInstanceOf[Int] 
    Employee(firstName, lastName, email, salary) 
} 
}.cache() 
display(explodeDF) 

Jak bym zrobić coś podobnego z kolumny dział (tj. dodać dwie dodatkowe kolumny do ramki danych o nazwie "id" i "nazwa")? Metody te nie są dokładnie takie same, a ja mogę tylko dowiedzieć się, jak utworzyć ramkę nowy danych przy użyciu:

val explodeDF = parquetDF.select("department.id","department.name") 
display(explodeDF) 

Gdy próbuję:

val explodeDF = parquetDF.explode($"department") { 
    case Row(dept: Seq[String]) => dept.map{dept => 
    val id = dept(0) 
    val name = dept(1) 
    } 
}.cache() 
display(explodeDF) 

pojawia się ostrzeżenie i błąd:

<console>:38: warning: non-variable type argument String in type pattern Seq[String] is unchecked since it is eliminated by erasure 
      case Row(dept: Seq[String]) => dept.map{dept => 
         ^
<console>:37: error: inferred type arguments [Unit] do not conform to method explode's type parameter bounds [A <: Product] 
    val explodeDF = parquetDF.explode($"department") { 
           ^

Odpowiedz

3

można użyć coś takiego:

var explodeDF = explodeDF.withColumn("id", explodeDF("department.id")) 
explodeDeptDF = explodeDeptDF.withColumn("name", explodeDeptDF("department.name")) 

które pomogły mi na te pytania i:

+0

Awaria etap: org.apache.spark.SparkException: Praca przerwana z powodu awarii scenicznej: Zadanie 0 w etapie 41,0 powiodło się 4 razy, najnowsze awaria: Przegrany zadaniowych 0,3 w etapie 41,0 (TID 1403, 10,81. 214.49): scala.MatchError: [[789012, Mechanical Engineering]] (klasy org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) – Feynman27

+0

@ Feynman27 robi [this] (http://stackoverflow.com/questions/ 25222989/usuwanie-w-scala-nie-zmienny-typ-argument-czy-niezaznaczony-od-to-to) pomaga? Wygląda na to, że pasuje do * twojej * próby. Myślę, że problem z moją odpowiedzią jest taki, że "pracownicy" mają również element, a "dział" nie. – gsamaras

+0

Tak, przykład 'employees' tworzy nowe wiersze, podczas gdy przykład' department' powinien tworzyć tylko dwie nowe kolumny. – Feynman27

2

To wydaje się działać (choć może nie najbardziej eleganckie rozwiązanie).

var explodeDF2 = explodeDF.withColumn("id", explodeDF("department.id")) 
explodeDF2 = explodeDF2.withColumn("name", explodeDF2("department.name")) 
+0

można "val explodeDF2 = explodeDF.withColumn (" id ", explodeDF (" department.id ")). WithColumn (" name ", explodeDF2 (" department.name "))' – Davos