2015-10-19 15 views
13

Używam Spark SQL (wspominam, że jest on w Sparku, jeśli ma wpływ na składnię SQL - nie jestem dostatecznie znany, aby się upewnić) i mam tabela, którą próbuję zmienić strukturę, ale utknąłem próbując transponować wiele kolumn w tym samym czasie.Rozbij (transponuj?) Wiele kolumn w tabeli Spark SQL

Zasadniczo mam dane, które wygląda następująco:

userId someString  varA  varB 
    1  "example1" [0,2,5] [1,2,9] 
    2  "example2" [1,20,5] [9,null,6] 

i chciałbym eksplodować zarówno Vara i varB jednocześnie (długość będzie zawsze zgodny) - tak, że ostateczne wyjście wygląda następująco:

userId someString  varA  varB 
    1  "example1"  0   1 
    1  "example1"  2   2 
    1  "example1"  5   9 
    2  "example2"  1   9 
    2  "example2"  20  null 
    2  "example2"  5   6 

ale mogę tylko wydają się uzyskać jeden eksplodować (vAR) oświadczenie do pracy w jednym poleceniu, a jeśli próbuję łańcucha nich (tj tworzenie temp tabeli po pierwszej komendy wybuchnąć) potem oczywiście dostać ogromna liczba duplikatów, niepotrzebnych wierszy.

Wielkie dzięki!

Odpowiedz

21

Czego pragniesz, nie jest możliwe bez niestandardowego UDF. W Scala można zrobić coś takiego:

val data = sc.parallelize(Seq(
    """{"userId": 1, "someString": "example1", 
     "varA": [0, 2, 5], "varB": [1, 2, 9]}""", 
    """{"userId": 2, "someString": "example2", 
     "varA": [1, 20, 5], "varB": [9, null, 6]}""" 
)) 

val df = sqlContext.read.json(data) 

df.printSchema 
// root 
// |-- someString: string (nullable = true) 
// |-- userId: long (nullable = true) 
// |-- varA: array (nullable = true) 
// | |-- element: long (containsNull = true) 
// |-- varB: array (nullable = true) 
// | |-- element: long (containsNull = true) 

Teraz możemy zdefiniować zip UDF:

import org.apache.spark.sql.functions.{udf, explode} 

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) 

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
    $"userId", $"someString", 
    $"vars._1".alias("varA"), $"vars._2".alias("varB")).show 

// +------+----------+----+----+ 
// |userId|someString|varA|varB| 
// +------+----------+----+----+ 
// |  1| example1| 0| 1| 
// |  1| example1| 2| 2| 
// |  1| example1| 5| 9| 
// |  2| example2| 1| 9| 
// |  2| example2| 20|null| 
// |  2| example2| 5| 6| 
// +------+----------+----+----+ 

z surowego SQL:

sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) 
df.registerTempTable("df") 

sqlContext.sql(
    """SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""") 
+0

Może to być stosowany na 3 kolumny, które mają typ sekwencji? –

+0

@AmitKumar Tak, dlaczego nie? Musisz dostosować sygnaturę i ciało, ale nie jest to trudne. – zero323

+0

Zastanawiam się, czy w nowszych interfejsach API danych można po prostu użyć map i spakować tablice razem bez tworzenia UDF i czy byłby on szybszy/skalowalny/zoptymalizowany przez silnik wykonawczy katalizatora. Spróbuję, kiedy na konsoli. – Davos