2016-12-07 25 views
11

Mam ramkę danych, która ma jeden wiersz i kilka kolumn. Niektóre z kolumn są pojedynczymi wartościami, a inne są listami. Wszystkie kolumny listy mają tę samą długość. Chcę podzielić każdą kolumnę listy do osobnego wiersza, zachowując dowolną kolumnę bez listy.Pyspark: Podział wielu kolumn tablicy na wiersze

próbki DF:

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')]) 
# +---+---------+---------+---+ 
# | a|  b|  c| d| 
# +---+---------+---------+---+ 
# | 1|[1, 2, 3]|[7, 8, 9]|foo| 
# +---+---------+---------+---+ 

Czego chcę:

+---+---+----+------+ 
| a| b| c | d | 
+---+---+----+------+ 
| 1| 1| 7 | foo | 
| 1| 2| 8 | foo | 
| 1| 3| 9 | foo | 
+---+---+----+------+ 

Gdybym miał tylko jedną kolumnę listy, to byłoby proste po prostu robi explode:

df_exploded = df.withColumn('b', explode('b')) 
# >>> df_exploded.show() 
# +---+---+---------+---+ 
# | a| b|  c| d| 
# +---+---+---------+---+ 
# | 1| 1|[7, 8, 9]|foo| 
# | 1| 2|[7, 8, 9]|foo| 
# | 1| 3|[7, 8, 9]|foo| 
# +---+---+---------+---+ 

Jednak jeśli spróbuję również explode kolumna , otrzymam danef Rame o długości kwadrat, czego chcę:

df_exploded_again = df_exploded.withColumn('c', explode('c')) 
# >>> df_exploded_again.show() 
# +---+---+---+---+ 
# | a| b| c| d| 
# +---+---+---+---+ 
# | 1| 1| 7|foo| 
# | 1| 1| 8|foo| 
# | 1| 1| 9|foo| 
# | 1| 2| 7|foo| 
# | 1| 2| 8|foo| 
# | 1| 2| 9|foo| 
# | 1| 3| 7|foo| 
# | 1| 3| 8|foo| 
# | 1| 3| 9|foo| 
# +---+---+---+---+ 

Co chcę jest - dla każdej kolumny, wziąć n-ty element tablicy w tej kolumnie i dodać, że do nowego wiersza. Próbowałem mapowania eksplodować po drugiej stronie wszystkich kolumn w dataframe, ale to nie wydają się działać albo:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF() 

Odpowiedz

13

Z DataFrames i UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType 
from pyspark.sql.functions import col, udf 

zip_ = udf(
    lambda x, y: list(zip(x, y)), 
    ArrayType(StructType([ 
     # Adjust types to reflect data types 
     StructField("first", IntegerType()), 
     StructField("second", IntegerType()) 
    ])) 
) 

(df 
    .withColumn("tmp", zip_("b", "c")) 
    # UDF output cannot be directly passed to explode 
    .withColumn("tmp", explode("tmp")) 
    .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d")) 

Z RDDs:

(df 
    .rdd 
    .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)]) 
    .toDF(["a", "b", "c", "d"])) 

Oba rozwiązania są nieefektywne ze względu na obciążenie komunikacyjne w języku Python. Jeśli rozmiar danych jest stałe można zrobić coś takiego:

from functools import reduce 
from pyspark.sql import DataFrame 

# Length of array 
n = 3 

# For legacy Python you'll need a separate function 
# in place of method accessor 
reduce(
    DataFrame.unionAll, 
    (df.select("a", col("b").getItem(i), col("c").getItem(i), "d") 
     for i in range(n)) 
).toDF("a", "b", "c", "d") 

lub nawet:

from pyspark.sql.functions import array, struct 

# SQL level zip of arrays of known size 
# followed by explode 
tmp = explode(array(*[ 
    struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c")) 
    for i in range(n) 
])) 

(df 
    .withColumn("tmp", tmp) 
    .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d")) 

ten powinien być znacznie szybszy w porównaniu z UDF lub RDD. Uogólnione wspierać dowolną liczbę kolumn:

# This uses keyword only arguments 
# If you use legacy Python you'll have to change signature 
# Body of the function can stay the same 
def zip_and_explode(*colnames, n): 
    return explode(array(*[ 
     struct(*[col(c).getItem(i).alias(c) for c in colnames]) 
     for i in range(n) 
    ])) 

df.withColumn("tmp", zip_and_explode("b", "c", n=3)) 
4

trzeba by użyć flatMap, nie map jak chcesz mieć wiele wierszy wyjściowe z każdego wiersza wejściowego.

from pyspark.sql import Row 
def dualExplode(r): 
    rowDict = r.asDict() 
    bList = rowDict.pop('b') 
    cList = rowDict.pop('c') 
    for b,c in zip(bList, cList): 
     newDict = dict(rowDict) 
     newDict['b'] = b 
     newDict['c'] = c 
     yield Row(**newDict) 

df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))