2016-04-11 12 views
7

Używam Scala i chcę zbudować własną funkcję DataFrame. Na przykład chcę traktować kolumnę jak tablicę, iterować po każdym elemencie i wykonywać obliczenia.Funkcja Spark Build Custom, zdefiniowana przez użytkownika funkcja

Na początek próbuję wdrożyć własną metodę getMax. Więc kolumna x miałoby wartości [3,8,2,5,9], a oczekiwany wynik metody byłoby 9.

Oto jak to wygląda w Scala

def getMax(inputArray: Array[Int]): Int = { 
    var maxValue = inputArray(0) 
    for (i <- 1 until inputArray.length if inputArray(i) > maxValue) { 
    maxValue = inputArray(i) 
    } 
    maxValue 
} 

ten jest to, co mam do tej pory, i dostaję ten błąd i nie wiem jak inaczej iterować przez kolumnę.

def getMax(col: Column): Column = { 
var maxValue = col(0) 
for (i <- 1 until col.length if col(i) > maxValue){ 
    maxValue = col(i) 
} 
maxValue 

}

Gdy jestem w stanie realizować własną metodę, będę tworzyć funkcję kolumnową

val value_max:org.apache.spark.sql.Column=getMax(df.col(“value”)).as(“value_max”) 

A potem mam nadzieję być w stanie wykorzystać to w SQL, np

val sample = sqlContext.sql("SELECT value_max(x) FROM table") 

i oczekiwanego wyjściowy będzie 9 danej kolumny sterującej [3,8,2,5,9]

Podążam za odpowiedzią z innego wątku Spark Scala - How do I iterate rows in dataframe, and add calculated values as new columns of the data frame, gdzie tworzy ona prywatną metodę odchylenia standardowego. Obliczenia, które zrobię, będą bardziej skomplikowane niż te (np. Będę porównywał każdy element w kolumnie), czy podążę we właściwym kierunku, czy powinienem bardziej przyjrzeć się funkcjom zdefiniowanym przez użytkownika?

+0

Proszę pokazać swoje wejście i wyjście/oczekiwane dataframes. Użyj 'show'. –

+0

Witam @JacekLaskowski dzięki za komentarz, zredagowałem pytanie, aby było jaśniejsze, co chciałbym osiągnąć. – other15

Odpowiedz

13

W DataFrame Spark nie można wykonywać iteracji elementów kolumny za pomocą podejść, o których myśleliście, ponieważ kolumna nie jest obiektem iteracyjnym.

jednak przetwarzać wartości w kolumnie, masz kilka opcji i właściwa zależy od zadania:

1) za pomocą istniejących wbudowanych funkcji

Spark SQL już mnóstwo przydatnych funkcji do przetwarzania kolumn, w tym funkcje agregacji i transformacji. Większość z nich znajduje się w pakiecie functions (documentation here). Niektóre inne (ogólnie funkcje binarne) można znaleźć bezpośrednio w obiekcie Column (). Tak więc, jeśli możesz z nich korzystać, zazwyczaj jest to najlepsza opcja. Uwaga: nie zapomnij o Window Functions.

2) Tworzenie UDF

Jeśli nie można zakończyć zadanie z wbudowanych funkcji, można rozważyć zdefiniowania UDF (User Defined Function). Są one użyteczne, gdy możesz przetwarzać każdy element kolumny niezależnie i spodziewasz się utworzyć nową kolumnę z taką samą liczbą wierszy, jak oryginalna kolumna (nie jest to kolumna zagregowana). Takie podejście jest dość proste: najpierw definiujesz prostą funkcję, następnie rejestrujesz ją jako UDF, a następnie używasz.Przykład:

def myFunc: (String => String) = { s => s.toLowerCase } 

import org.apache.spark.sql.functions.udf 
val myUDF = udf(myFun) 

val newDF = df.withColumn("newCol", myUDF(df("oldCol"))) 

Aby uzyskać więcej informacji, here's miły artykuł.

3) przy użyciu UDAF

Jeśli zadaniem jest stworzenie dane zagregowane, można zdefiniować UDAF (User Defined Function Aggregation). Nie mam dużego doświadczenia z tym, ale mogę wskazać na ładnym tutorialu:

https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

4) powróci do RDD przetwarzania

Jeśli naprawdę nie mogę użyj powyższych opcji lub jeśli przetwarzanie zadania zależy od różnych wierszy do przetworzenia, a to nie jest agregacja, to myślę, że musiałbyś wybrać kolumnę, którą chcesz i przetworzyć za pomocą odpowiedniego RDD. Przykład:

val singleColumnDF = df("column") 

val myRDD = singleColumnDF.rdd 

// process myRDD 

więc nie było opcji mogłem pomyśleć. Mam nadzieję, że to pomoże.

+0

Dzięki Daniel, bardzo pouczające. Tak więc główna różnica między UDF i UDAF polega na tym, że UDAF zwraca jedną wartość na podstawie obliczenia kolumny? Mam nadzieję, że wbudowane funkcje będą wystarczające dla tego, co chcę zrobić, ale dobrze byłoby wiedzieć, jak wdrożyć moje własne funkcje. – other15

+0

@ other15 UDAF jest zwykle stosowany z 'groupBy', więc może zwracać zagregowaną wartość dla każdej odrębnej wartości w kolumnach przekazywanych do' groupBy' (podobnie do prostego 'df.groupBy (" klucz "). Agg (avg ("value")) "działa). Jeśli jednak nie używasz groupBy, UDAF zwróci tylko jedną wartość. –

0

Prostym przykładem jest podana w excellent documentation, gdzie cała sekcja jest poświęcona UDF:

import org.apache.spark.sql._ 

val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value") 
val spark = df.sparkSession 
spark.udf.register("simpleUDF", (v: Int) => v * v) 
df.select($"id", callUDF("simpleUDF", $"value"))