2015-01-27 6 views
5

Używam iskry z scala i mam RDD pełne tuple2 zawierające złożony obiekt jako klucz i podwójne. Celem jest dodanie podwójnej (częstotliwości), jeśli obiekt jest identyczny.reduceByKey przy użyciu obiektu Scala jako klucza

za to mam mojego obiektu zdefiniowano następująco:

case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Ordered[SimpleCoocurrence]{ 
     def compare(that: SimpleCoocurrence) = { 
     if(this.word.equals(that.word)&&this.word_pos.equals(that.word_pos) 
      &&this.cooc.equals(that.cooc)&&this.cooc_pos.equals(that.cooc_pos)) 
      0 
     else 
      this.toString.compareTo(that.toString) 
     } 
    } 

teraz próbuję wykorzystywać reduceBykey tak:

val coocRDD = sc.parallelize(coocList) 
println(coocRDD.count) 
coocRDD.map(tup=>tup).reduceByKey(_+_) 
println(coocRDD.count) 

Ale wynik pokazuje, że przed i RDD po przetworzeniu redubykey zawiera dokładnie taką samą liczbę elementów.

Jak mogę wykonać reduByKey za pomocą tuple2 [SimpleCoocurrence, Double]? Czy wdrażanie cechy uporządkowanej jest dobrym sposobem, aby powiedzieć Sparkowi, jak porównać moje obiekty? Czy powinienem używać tylko tuple2 [String, Double]?

thx,

Odpowiedz

5

reduceByKey nie używa Kolejność ale hashCode i equals określić, jakie klawisze są takie same. W szczególności, hashPartitioner grupuje klucze za pomocą skrótu, klucze Sothat z tym samym hashCode spadają na tę samą partycję, dzięki czemu dalsze zmniejszenie może się zdarzyć na partycji.

klasy przypadków mają domyślną implementację equals i hashCode. Prawdopodobnie użyte dane testowe mają różne wartości pola distance:Double, dzięki czemu każda instancja jest unikalnym obiektem. Użycie go jako klucza spowoduje, że tylko identyczne obiekty zostaną zredukowane jako jeden.

Jednym ze sposobów rozwiązania tego problemu byłoby określenie klucza do case class i metody dodawania do obiektu, coś takiego:

case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Serializable { 
    val key = word + word_pos + cooc + cooc_pos 
} 
object SimpleCoocurrence { 
    val add: (SimpleCoocurrence, SimpleCoocurrence) => SimpleCoocurrence = ??? 
} 

val coocList:List[SimpleCoocurrence] = ??? 
val coocRDD = sc.parallelize(coocList) 
val coocByKey = coocRDD.keyBy(_.key) 
val addedCooc = coocByKey.reduceByKey(SimpleCoocurrence.add) 

(*) kod podany jako przykład prowadząc - nie skompilowany lub przetestowane .

+0

https://issues.apache.org/jira/browse/SPARK-10493 – yanghaogn

0

Po pierwsze, jestem głupi ...

Następnie, w przypadku gdy ktoś ma ten sam problem i chcą korzystać z obiektów złożonych Scala jako klucz do reduceByKey na Spark:

Spark potrafi porównać dwa obiekty, nawet jeśli nie realizują Zamówionych. Tak więc powyższy kod jest rzeczywiście fonctionnal.

Jedynym problemem było ... to, że drukowałem ten sam RDD przed i po. Kiedy to piszę, to faktycznie działa dobrze.

val coocRDD = sc.parallelize(coocList) 
println(coocRDD.count) 
val newRDD = coocRDD.map(tup=>tup).reduceByKey(_+_) 
println(newRDD.count) 
0

Nie przechowujesz wyników zmniejszeniaByKey. Spróbuj zamiast tego:

val coocRDD = sc.parallelize(coocList) 
println(coocRDD.count) 
val result = coocRDD.map(tup=>tup).reduceByKey(_+_) 
println(result.count)