2016-05-07 10 views
6

To powinno być łatwe, ale .... używając Spark 1.6.1 .... Mam DataFrame # 1 z kolumnami A, B, C . przy wartościach:Tworzenie nowej DataFrame Spark z nową wartością kolumny opartą na kolumnie w pierwszej ramce danych Java

A B C 
1 2 A 
2 2 A 
3 2 B 
4 2 C 

I następnie utworzyć nową dataframe z nowej kolumny D tak:

DataFrame df2 = df1.withColumn("D", df1.col("C")); 

tej pory tak dobrze, ale faktycznie chcę wartość w kolumnie D być czyli warunkowa:

// pseudo code 
if (col C = "A") the col D = "X" 
else if (col C = "B") the col D = "Y" 
else col D = "Z" 

Następnie opróżniam kolumnę C i zmieniam nazwę D na C. Próbowałem przeglądać funkcje kolumn, ale nic nie pasuje do rachunku; Pomyślałem o użyciu df1.rdd(). Map() i iteracji po wierszach, ale poza tym, że nie udało mi się go uruchomić, pomyślałem, że cały punkt DataFrames miał odejść od abstrakcji RDD?

Niestety muszę to zrobić w Javie (i oczywiście Spark z Javą nie jest optymalny !!). Wygląda na to, że brakuje mi tego oczywistego i cieszę się, że pokazałem, że jestem idiotą, gdy przedstawię rozwiązanie!

Odpowiedz

12

Uważam, że można to zrobić za pomocą when. Dodatkowo prawdopodobnie możesz zastąpić starą kolumnę bezpośrednio. Na swoim przykładzie, kod byłoby coś takiego:

import static org.apache.spark.sql.functions.*; 

Column newCol = when(col("C").equalTo("A"), "X") 
    .when(col("C").equalTo("B"), "Y") 
    .otherwise("Z"); 

DataFrame df2 = df1.withColumn("C", newCol); 

Więcej informacji na temat when sprawdzić Column Javadoc.

+1

Dzięki za to - ja rzeczywiście wpatrując się oczywiste w obliczu: s - co mi brakowało był statyczny import funkcji sql tj: import static org.apache. spark.sql.functions. * – user1128482

+0

@ user1128482 Przepraszam, zapomniałem importu. Dobrze wiedzieć, że w końcu się dowiedziałeś. –

2

Dzięki Daniel mam rozwiązać ten :)

brakującym elementem był statyczny import funkcji sql

import static org.apache.spark.sql.functions.*; 

musi próbowałem milion różnych sposobów wykorzystania kiedy, ale dostałem kompilacji błędy/błędy w czasie wykonywania, ponieważ nie dokonałem importu. Po zaimportowaniu odpowiedzi Daniela była na miejscu!

1

Możesz również użyć udf do wykonania tej samej pracy. Wystarczy napisać prosty jeśli potem jeszcze struktura

import org.apache.spark.sql.functions.udf 
val customFunct = udf { d => 
     //if then else construct 
    } 

val new_DF= df.withColumn(column_name, customFunct(df("data_column")))