2015-11-24 10 views
12

Mam DataFrame dwóch kolumn, ID typu Int i Vec typu Vector (org.apache.spark.mllib.linalg.Vector).Jak zdefiniować niestandardową funkcję agregacji, aby podsumować kolumnę wektorów?

DataFrame wygląda następująco:

ID,Vec 
1,[0,0,5] 
1,[4,0,1] 
1,[1,2,1] 
2,[7,5,0] 
2,[3,3,4] 
3,[0,8,1] 
3,[0,0,1] 
3,[7,7,7] 
.... 

Chciałbym zrobić groupBy($"ID") następnie zastosować agregację w wierszach wewnątrz każdej grupy poprzez zsumowanie wektory.

Pożądana moc wyżej przykładzie byłaby:

ID,SumOfVectors 
1,[5,2,7] 
2,[10,8,4] 
3,[7,15,9] 
... 

Dostępne funkcje agregacji nie działa, np df.groupBy($"ID").agg(sum($"Vec") doprowadzi do wyjątku ClassCastException.

Jak zaimplementować niestandardową funkcję agregacji, która pozwala mi wykonywać sumę wektorów lub tablic lub dowolną inną niestandardową operację?

+3

Możliwy duplikat [Jak zdefiniować i użyć funkcji agregującej zdefiniowanej przez użytkownika w Spark SQL?] (Http://stackoverflow.com/questions/32100973/how-can-i-define-and-use-a-user -defined-aggregate-function-in-spark-sql) –

Odpowiedz

19

Osobiście nie zawracałbym sobie głowy UDAF-ami. Jest więcej niż gadatliwy i niezbyt szybki. Zamiast tego po prostu używać reduceByKey/foldByKey:

import org.apache.spark.sql.Row 
import breeze.linalg.{DenseVector => BDV} 
import org.apache.spark.ml.linalg.{Vector, Vectors} 

val rdd = sc.parallelize(Seq(
    (1, "[0,0,5]"), (1, "[4,0,1]"), (1, "[1,2,1]"), 
    (2, "[7,5,0]"), (2, "[3,3,4]"), (3, "[0,8,1]"), 
    (3, "[0,0,1]"), (3, "[7,7,7]"))) 

val df = rdd.map{case (k, v) => (k, Vectors.parse(v))}.toDF("id", "vec") 

val aggregated = df 
    .rdd 
    .map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values)) } 
    .foldByKey(BDV(Array.fill(3)(0.0)))(_ += _) 
    .mapValues(v => Vectors.dense(v.toArray)) 
    .toDF("id", "vec") 

aggregated.show 

// +---+--------------+ 
// | id|   vec| 
// +---+--------------+ 
// | 1| [5.0,2.0,7.0]| 
// | 2|[10.0,8.0,4.0]| 
// | 3|[7.0,15.0,9.0]| 
// +---+--------------+ 

I tak dla porównania "prosty" UDAF. Wymagane import:

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
    UserDefinedAggregateFunction} 
import org.apache.spark.ml.linalg.{Vector, Vectors, SQLDataTypes} 
import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType} 
import org.apache.spark.sql.Row 
import scala.collection.mutable.WrappedArray 

definicja klasy:

class VectorSum (n: Int) extends UserDefinedAggregateFunction { 
    def inputSchema = new StructType().add("v", SQLDataTypes.VectorType) 
    def bufferSchema = new StructType().add("buff", ArrayType(DoubleType)) 
    def dataType = SQLDataTypes.VectorType 
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = { 
     buffer.update(0, Array.fill(n)(0.0)) 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) = { 
     if (!input.isNullAt(0)) { 
     val buff = buffer.getAs[WrappedArray[Double]](0) 
     val v = input.getAs[Vector](0).toSparse 
     for (i <- v.indices) { 
      buff(i) += v(i) 
     } 
     buffer.update(0, buff) 
     } 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
     val buff1 = buffer1.getAs[WrappedArray[Double]](0) 
     val buff2 = buffer2.getAs[WrappedArray[Double]](0) 
     for ((x, i) <- buff2.zipWithIndex) { 
     buff1(i) += x 
     } 
     buffer1.update(0, buff1) 
    } 

    def evaluate(buffer: Row) = Vectors.dense(
     buffer.getAs[Seq[Double]](0).toArray) 
} 

i wykorzystanie przykład:

df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show 

// +---+--------------+ 
// | id|   vec| 
// +---+--------------+ 
// | 1| [5.0,2.0,7.0]| 
// | 2|[10.0,8.0,4.0]| 
// | 3|[7.0,15.0,9.0]| 
// +---+--------------+ 

Zobacz także: How to find mean of grouped Vector columns in Spark SQL?.

+0

Widzę, że sztuczka polega na użyciu breeze.linalg.DensVector, dlaczego działa, a gęste wektory z mllib.linalg nie? – Rami

+1

Problem polega na tym, że nie ma metody '+' dla wersji Scala 'mllib.linalg.Vector'. – zero323

+0

Nie można tego zrobić za pomocą DF lub SQL? – oluies

0

Proponuję następujące (prace nad dalszą Spark 2.0.2), to może być zoptymalizowane, ale to bardzo miłe, jedno trzeba wiedzieć z wyprzedzeniem jest wielkości wektorowych podczas tworzenia instancji UDAF

import org.apache.spark.ml.linalg._ 
import org.apache.spark.mllib.linalg.WeightedSparseVector 
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} 
import org.apache.spark.sql.types._ 

class VectorAggregate(val numFeatures: Int) 
    extends UserDefinedAggregateFunction { 

private type B = Map[Int, Double] 

def inputSchema: StructType = StructType(StructField("vec", new VectorUDT()) :: Nil) 

def bufferSchema: StructType = 
StructType(StructField("agg", MapType(IntegerType, DoubleType)) :: Nil) 

def initialize(buffer: MutableAggregationBuffer): Unit = 
buffer.update(0, Map.empty[Int, Double]) 

def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 
    val zero = buffer.getAs[B](0) 
    input match { 
     case Row(DenseVector(values)) => buffer.update(0, values.zipWithIndex.foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) 
     case Row(SparseVector(_, indices, values)) => buffer.update(0, values.zip(indices).foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) }} 
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
val zero = buffer1.getAs[B](0) 
buffer1.update(0, buffer2.getAs[B](0).foldLeft(zero){case (acc,(i,v)) => acc.updated(i, v + acc.getOrElse(i,0d))})} 

def deterministic: Boolean = true 

def evaluate(buffer: Row): Any = { 
    val Row(agg: B) = buffer 
    val indices = agg.keys.toArray.sorted 
    Vectors.sparse(numFeatures,indices,indices.map(agg)).compressed 
} 

def dataType: DataType = new VectorUDT() 
}