2016-03-17 17 views
6

Biorąc pod uwagę następujące PySpark DataFrameJak odjąć kolumnę dni od kolumny dat w Pyspark?

df = sqlContext.createDataFrame([('2015-01-15', 10), 
           ('2015-02-15', 5)], 
           ('date_col', 'days_col')) 

Jak kolumna dni zostaną odjęte z kolumny dat? W tym przykładzie uzyskana kolumna powinna być ['2015-01-05', '2015-02-10'].

Zajrzałem do pyspark.sql.functions.date_sub(), ale wymaga to kolumny daty i jednego dnia, tj. date_sub(df['date_col'], 10). Najlepiej byłoby, gdybym zrobił date_sub(df['date_col'], df['days_col']).

Próbowałem też tworzenie UDF:

from datetime import timedelta 
def subtract_date(start_date, days_to_subtract): 
    return start_date - timedelta(days_to_subtract) 

subtract_date_udf = udf(subtract_date, DateType()) 
df.withColumn('subtracted_dates', subtract_date_udf(df['date_col'], df['days_col']) 

to technicznie działa, ale czytałem, że zwiększenie pomiędzy Spark i Python może powodować problemy z wydajnością dla dużych zbiorów danych. Mogę trzymać się tego rozwiązania na razie (nie ma potrzeby przedwczesnej optymalizacji), ale moje intuicje mówią, że musi istnieć sposób na zrobienie tej prostej rzeczy bez użycia UDF w Pythonie.

Odpowiedz

3

Udało mi się rozwiązać to za pomocą selectExpr.

df.selectExpr('date_sub(date_col, day_col) as subtracted_dates') 

Jeśli chcesz dołączyć kolumnę do oryginalnego DF, wystarczy dodać * do wyrażenia

df.selectExpr('*', 'date_sub(date_col, day_col) as subtracted_dates') 
+1

Jeśli nie masz nic przeciwko wpisywaniu kodu SQL, możesz to w prosty sposób uprościć do 'df.select (wyrażenie (" date_sub ({0}, {1}) ".format (" date_col "," days_col "))), co czyni błahe komponowanie. – zero323

1

Nie jest to najbardziej eleganckie rozwiązanie, ale jeśli nigdy nie chcesz się włamać wyrażeń SQL w Scala (nie że powinno to być trudne, ale te są prywatne do sql) coś jak to powinno załatwić sprawę:

from pyspark.sql import Column 

def date_sub_(c1: Column, c2: Column) -> Column: 
    return ((c1.cast("timestamp").cast("long") - 60 * 60 * 24 * c2) 
     .cast("timestamp").cast("date")) 

Dla adnotacji typu Python 2.x po prostu upuść.

+0

mądry. Myślę, że znalazłem nieco bardziej eleganckie rozwiązanie za pomocą 'selectExpr', ale dzięki za pomoc! – kjmij

0

nieco inny format, ale działa również:

df.registerTempTable("dfTbl") 

newdf = spark.sql(""" 
        SELECT *, date_sub(d.date_col, d.day_col) AS DateSub 
        FROM dfTbl d 
        """)