8

Próbuję przekształcić ramkę danych za pomocą funkcji, która pobiera tablicę jako parametr. Mój kod wygląda mniej więcej tak:Prześlij tablicę jako parametr UDF w Spark SQL

def getCategory(categories:Array[String], input:String): String = { 
    categories(input.toInt) 
} 

val myArray = Array("a", "b", "c") 

val myCategories =udf(getCategory _) 

val df = sqlContext.parquetFile("myfile.parquet) 

val df1 = df.withColumn("newCategory", myCategories(lit(myArray), col("myInput")) 

Jednak podświetlony nie lubi tablic i błędów tego skryptu. Próbowałem definining nową funkcję częściowo zastosowanie i potem UDF potem:

val newFunc = getCategory(myArray, _:String) 
val myCategories = udf(newFunc) 

val df1 = df.withColumn("newCategory", myCategories(col("myInput"))) 

To nie działa albo jak uzyskać wyjątek NullPointer i wydaje myArray nie jest rozpoznawane. Jakieś pomysły dotyczące przekazywania tablicy jako parametru do funkcji z ramką danych?

Oddzielnie, każde wyjaśnienie, dlaczego robienie czegoś prostego, jak korzystanie z funkcji na ramce danych, jest tak skomplikowane (zdefiniować funkcję, przedefiniować ją jako UDF itp.)?

Odpowiedz

7

Najprawdopodobniej nie najładniejszym rozwiązaniem, ale można spróbować coś takiego:

def getCategory(categories: Array[String]) = { 
    udf((input:String) => categories(input.toInt)) 
} 

df.withColumn("newCategory", getCategory(myArray)(col("myInput"))) 

Można również spróbować array literałów:

val getCategory = udf(
    (input:String, categories: Array[String]) => categories(input.toInt)) 

df.withColumn(
    "newCategory", getCategory($"myInput", array(myArray.map(lit(_)): _*))) 

Na marginesie użyciu Map zamiast Array to prawdopodobnie lepszy pomysł:

def mapCategory(categories: Map[String, String], default: String) = { 
    udf((input:String) => categories.getOrElse(input, default)) 
} 

val myMap = Map[String, String]("1" -> "a", "2" -> "b", "3" -> "c") 

df.withColumn("newCategory", mapCategory(myMap, "foo")(col("myInput"))) 

Od Spark 1.5.0 można również użyć array funkcję:

import org.apache.spark.sql.functions.array 

val colArray = array(myArray map(lit _): _*) 
myCategories(lit(colArray), col("myInput")) 

Zobacz również Spark UDF with varargs