2016-10-20 40 views
5

Pracuję z Spark i PySpark. Staram się osiągnąć rezultat równoważny następującemu Pseudokod:PySpark: withColumn() z dwoma warunkami i trzema wynikami

df = df.withColumn('new_column', 
    IF fruit1 == fruit2 THEN 1, ELSE 0. IF fruit1 IS NULL OR fruit2 IS NULL 3.) 

próbuję to zrobić w PySpark ale nie jestem pewien co do składni. Jakieś wskazówki? Zajrzałem do expr(), ale nie mogłem go uruchomić.

Należy pamiętać, że df jest jest pyspark.sql.dataframe.DataFrame.

Odpowiedz

13

Istnieje kilka skutecznych sposobów realizacji tego. Zacznijmy od wymaganych importu:

from pyspark.sql.functions import col, expr, when 

Można użyć gałąź IF funkcja wewnątrz expr:

new_column_1 = expr(
    """IF(fruit1 IS NULL OR fruit2 IS NULL, 3, IF(fruit1 = fruit2, 1, 0))""" 
) 

lub when + otherwise:

new_column_2 = when(
    col("fruit1").isNull() | col("fruit2").isNull(), 3 
).when(col("fruit1") == col("fruit2"), 1).otherwise(0) 

Wreszcie można użyć następującej sztuczki:

from pyspark.sql.functions import coalesce, lit 

new_column_3 = coalesce((col("fruit1") == col("fruit2")).cast("int"), lit(3)) 

z przykładowymi danymi:

df = sc.parallelize([ 
    ("orange", "apple"), ("kiwi", None), (None, "banana"), 
    ("mango", "mango"), (None, None) 
]).toDF(["fruit1", "fruit2"]) 

można wykorzystać to w następujący sposób:

(df 
    .withColumn("new_column_1", new_column_1) 
    .withColumn("new_column_2", new_column_2) 
    .withColumn("new_column_3", new_column_3)) 

a wynik jest:

+------+------+------------+------------+------------+ 
|fruit1|fruit2|new_column_1|new_column_2|new_column_3| 
+------+------+------------+------------+------------+ 
|orange| apple|   0|   0|   0| 
| kiwi| null|   3|   3|   3| 
| null|banana|   3|   3|   3| 
| mango| mango|   1|   1|   1| 
| null| null|   3|   3|   3| 
+------+------+------------+------------+------------+ 
3

będziemy chcieli użyć UDF jak poniżej

from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import udf 

def func(fruit1, fruit2): 
    if fruit1 == None or fruit2 == None: 
     return 3 
    if fruit1 == fruit2: 
     return 1 
    return 0 

func_udf = udf(func, IntegerType()) 
df = df.withColumn('new_column',func_udf(df['fruit1'], df['fruit2'])) 
+0

mam kilka błędów z tego rozwiązania, @David . Pierwszy został rozwiązany za pomocą 'from pyspark.sql.types import StringType'. Drugi to: 'TypeError: 'int' obiekt nie jest wywoływalny', którego nie jestem pewien jak rozwiązać. Zwróć uwagę, że 'df' to' pyspark.sql.dataframe.DataFrame'. – user2205916

+0

@ user2205916 Miałem kilka literówek. W linii 'def func (...' miałem 'owoc 1' (z spacją) zamiast' owoc1' .W wierszu rozpoczynającym 'func_udf = ...' miałem 'StringType' zamiast' IntegerType'. Spróbuj zaktualizowanego kodu i daj mi znać, jeśli nadal masz problemy – David

+0

Pojawia się ten sam komunikat o błędzie. Ponadto, myślę, że brakuje paren na końcu 'df =. ..' – user2205916