2016-07-14 39 views
15

Próbuję nauczyć się używać DataFrames i DataSet więcej oprócz RDD. W przypadku RDD wiem, że mogę wykonać someRDD.reduceByKey((x,y) => x + y), ale nie widzę tej funkcji dla zestawu danych. Więc postanowiłem napisać jedną.Trasowanie własnego reduByKey w Spark Dataset

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => { 
    val result = mutable.HashMap.empty[(Long,Long),Int] 
    val keys = mutable.HashSet.empty[(Long,Long)] 
    y.keys.foreach(z => keys += z) 
    x.keys.foreach(z => keys += z) 
    for (elem <- keys) { 
    val s1 = if(x.contains(elem)) x(elem) else 0 
    val s2 = if(y.contains(elem)) y(elem) else 0 
    result(elem) = s1 + s2 
    } 
    result 
}) 

Powoduje to jednak zwrócenie wszystkiego do sterownika. Jak napisałbyś to, aby zwrócić Dataset? Może map Partition i zrobić to tam?

Uwaga to kompiluje, ale nie działa, ponieważ nie ma dla koderów Map jeszcze

+0

ze Spark 2.0.0, spróbuj tego, yourDataset.groupByKey (...). ReduceGroups (...) –

+6

Czy optymalizator katalizatora zauważy, że robisz grupę, a następnie redukuje i zwiększa wydajność? Przez "skuteczny" mam na myśli, w jaki sposób na RDD robi redukcję przez klucz jest lepszy niż robienie grupy przez to zmniejszyć przez? –

Odpowiedz

18

zakładam, twoim celem jest przełożenie tego idiomu do zbiorów danych:

rdd.map(x => (x.someKey, x.someField)) 
    .reduceByKey(_ + _) 

// => returning an RDD of (KeyType, FieldType) 

Obecnie najbliższy rozwiązanie ja które z zestawu danych API znaleziono wygląda następująco:

ds.map(x => (x.someKey, x.someField))   // [1] 
    .groupByKey(_._1)        
    .reduceGroups((a, b) => (a._1, a._2 + b._2)) 
    .map(_._2)         // [2] 

// => returning a Dataset of (KeyType, FieldType) 

// Comments: 
// [1] As far as I can see, having a map before groupByKey is required 
//  to end up with the proper type in reduceGroups. After all, we do 
//  not want to reduce over the original type, but the FieldType. 
// [2] required since reduceGroups converts back to Dataset[(K, V)] 
//  not knowing that our V's are already key-value pairs. 

nie wygląda bardzo elegancko i stosownie do szybkiego odniesienia jest również dużo l ess wydajność, więc może czegoś tu brakuje ...

Uwaga: Alternatywą może być użycie groupByKey(_.someKey) jako pierwszego kroku. Problem polega na tym, że użycie groupByKey zmienia typ ze zwykłego Dataset na KeyValueGroupedDataset. Ten ostatni nie ma regularnej funkcji map. Zamiast tego oferuje mapGroups, co nie wydaje się zbyt wygodne, ponieważ zawija wartości do Iterator i wykonuje losowanie zgodnie z docstringiem.

+3

To działa. Po prostu uwaga, funkcja reduceByKey jest bardziej wydajna, ponieważ zmniejsza się w każdym węźle przed przetasowaniem. Wykonanie pierwszego tasowania grupy przez Keyy spowoduje redukcję wszystkich elementów. Dlatego jest znacznie mniej wydajny. Co zabawne, to było to, co robiłem, zanim dowiedziałem się o reduceByKey, ale zapomniałem :-) –

+0

@CarlosBribiescas Czytałem na interwebs, że zestawy danych wykorzystują Sparks "Catalyst Optimizer, i powinny być w stanie przesunąć w dół zmniejszyć funkcję przed tasowaniem. Może to wyjaśniać, dlaczego w API 'Dataset' nie ma' reduceByKey'. Jednak z mojego doświadczenia wynika, że ​​tak nie jest i 'groupByKey.reduceGroups' tasuje znacznie więcej danych i jest znacznie wolniejsze niż' reduceByKey'. –

+4

Wygląda na to, że wydajność redukuj grupy została poprawiona z wersji 2.0.1 i 2.1.0 [Spark-16391] (https://issues.apache.org/jira/browse/SPARK-16391) – Franzi

3

Bardziej wydajne rozwiązanie wykorzystuje mapPartitions przed groupByKey zmniejszyć ilość tasowanie (uwaga to nie jest dokładnie taki sam podpis jak reduceByKey ale myślę, że jest bardziej elastyczny, aby przekazać funkcję niż wymagają zestaw danych składa się z krotki).

def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V) 
    (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = { 
    def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = { 
    iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator 
    } 
    ds.mapPartitions(h(f, g, _)) 
    .groupByKey(f)(encK) 
    .reduceGroups(g) 
} 

w zależności od kształtu/wielkości swoich danych, to jest w ciągu 1 sekundy z wykonania reduceByKey io 2x tak szybko jak groupByKey(_._1).reduceGroups. Nadal jest miejsce na ulepszenia, więc sugestie byłyby mile widziane.