2016-08-17 23 views
13

Podczas pracy z DataFrames Sparka, funkcje zdefiniowane przez użytkownika (UDF) są wymagane do mapowania danych w kolumnach. UDF wymagają, aby typy argumentów były jawnie określone. W moim przypadku potrzebuję manipulować kolumną, która składa się z tablic obiektów i nie wiem, jakiego typu użyć. Oto przykład:Definiowanie UDF, który akceptuje Array obiektów w Spark DataFrame?

import sqlContext.implicits._ 

// Start with some data. Each row (here, there's only one row) 
// is a topic and a bunch of subjects 
val data = sqlContext.read.json(sc.parallelize(Seq(
    """ 
    |{ 
    | "topic" : "pets", 
    | "subjects" : [ 
    | {"type" : "cat", "score" : 10}, 
    | {"type" : "dog", "score" : 1} 
    | ] 
    |} 
    """))) 

To stosunkowo proste w użyciu wbudowanych org.apache.spark.sql.functions do wykonywania podstawowych operacji na danych w kolumnach

import org.apache.spark.sql.functions.size 
data.select($"topic", size($"subjects")).show 

+-----+--------------+ 
|topic|size(subjects)| 
+-----+--------------+ 
| pets|    2| 
+-----+--------------+ 

i to na ogół łatwo pisać zwyczaj UDF wykonywać dowolne operacje

import org.apache.spark.sql.functions.udf 
val enhance = udf { topic : String => topic.toUpperCase() } 
data.select(enhance($"topic"), size($"subjects")).show 

+----------+--------------+ 
|UDF(topic)|size(subjects)| 
+----------+--------------+ 
|  PETS|    2| 
+----------+--------------+ 

Ale co, jeśli chcę użyć UDF do manipulowania tablicą obiektów w kolumnie "podmioty"? Jakiego typu używam do argumentu w UDF? Na przykład, jeśli chcę reimplement funkcję rozmiarze, zamiast korzystać z jednego dostarczonego przez iskry:

val my_size = udf { subjects: Array[Something] => subjects.size } 
data.select($"topic", my_size($"subjects")).show 

Wyraźnie Array[Something] nie działa ... jakiego typu należy użyć !? Czy powinienem całkowicie zrezygnować z Array[]? Klinowanie się mówi mi, że może mieć z tym coś wspólnego, ale wciąż jest inny typ, który muszę podać.

Odpowiedz

16

Czego szukasz Seq[o.a.s.sql.Row]:

import org.apache.spark.sql.Row 

val my_size = udf { subjects: Seq[Row] => subjects.size } 

Wyjaśnienie:

  • Obecna reprezentacja ArrayType jest, jak już wiesz, WrappedArray tak Array nie będzie działać i to lepiej pozostać po bezpiecznej stronie.
  • Typ lokalny dla StructType to . Niestety oznacza to, że dostęp do poszczególnych pól nie jest bezpieczny.

Uwagi:

  • celu utworzenia struct funkcji przekazanej udf musi powrócić Product typu (Tuple* lub case class) nie .
+0

uzyskać to: java.lang.UnsupportedOperationException: Schema dla typu org.apache.spark.sql.Row nie jest obsługiwana na org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection. scala: 733) at org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection.scala: 671) at org.apache.spark.sql.functions $ .udf (functions.scala: 3076) . .. 134 elided –

+0

@GuruprasadGV UDF powinien zwrócić 'Product' (' TupleN', case case) dla 'structs'. – zero323