15

Używam pyspark, ładowanie dużego pliku csv do ramki danych z iskrowym csv, a jako etap wstępnego przetwarzania muszę zastosować różne operacje do danych dostępnych w jednej z kolumn (zawierającej ciąg jsonów). To zwróci wartości X, z których każda musi być przechowywana w osobnej kolumnie.Apache Spark - Przypisywanie wyniku UDF do wielu kolumn kolumny danych

Ta funkcjonalność zostanie zaimplementowana w UDF. Jednak nie jestem pewien, jak zwrócić listę wartości z tego UDF i podawać je do poszczególnych kolumn. Poniżej znajduje się prosty przykład:

(...) 
from pyspark.sql.functions import udf 
def udf_test(n): 
    return [n/2, n%2] 

test_udf=udf(udf_test) 


df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4) 

która produkuje następujące:

+------+----------+--------------------+ 
|amount|trans_date|    test| 
+------+----------+--------------------+ 
| 28.0|2016-02-07|   [14.0, 0.0]| 
| 31.01|2016-02-07|[15.5050001144409...| 
| 13.41|2016-02-04|[6.70499992370605...| 
| 307.7|2015-02-17|[153.850006103515...| 
| 22.09|2016-02-05|[11.0450000762939...| 
+------+----------+--------------------+ 
only showing top 5 rows 

Jaki byłby najlepszy sposób na przechowywanie dwóch (w tym przykładzie) Wartości zwracane przez UDF na oddzielnych kolumnach? Teraz są one wpisane jako ciągi:

df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema() 

root 
|-- amount: float (nullable = true) 
|-- trans_date: string (nullable = true) 
|-- test: string (nullable = true) 

Odpowiedz

25

Nie jest możliwe, aby utworzyć wiele kolumn najwyższego poziomu od jednego UDF rozmowy, ale można utworzyć nowy struct. Wymaga to UDF z określonym returnType:

from pyspark.sql.functions import udf 
from pyspark.sql.types import * 

schema = StructType([ 
    StructField("foo", FloatType(), False), 
    StructField("bar", FloatType(), False) 
]) 

def udf_test(n): 
    return (n/2, n % 2) if n and n != 0.0 else (float('nan'), float('nan')) 

test_udf = udf(udf_test, schema) 
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"]) 

foobars = df.select(test_udf("y").alias("foobar")) 
foobars.printSchema() 
## root 
## |-- foobar: struct (nullable = true) 
## | |-- foo: float (nullable = false) 
## | |-- bar: float (nullable = false) 

Ty dalej spłaszczyć schematu z prostego select:

foobars.select("foobar.foo", "foobar.bar").show() 
## +---+---+ 
## |foo|bar| 
## +---+---+ 
## |1.0|0.0| 
## |1.5|1.0| 
## +---+---+ 

Zobacz także Derive multiple columns from a single column in a Spark DataFrame

+0

fantastyczne! Działa to bardzo dobrze w tym, czego potrzebowałem. Byłem tam przez większość drogi, ale karmiłem schemat StructType niepoprawnie do udf, co powodowało, że moja nowa kolumna kończyła się jako StringType. Wielkie dzięki! –

+0

Dzięki! Właśnie tego szukałem. :) – dksahuji