2016-08-05 52 views
6

Chcę wykonać działanie w trybie RDD, takie jak reduce, ale nie trzeba, aby operator był przemienny. tj. chcę, aby result na następnej stronie zawsze będzie "123456789".Czy w RDD są jakieś działania dotyczące zamówienia?

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24 

scala> val result = rdd.someAction{ _+_ } 

Najpierw znalazłem fold. Doc z RDD#fold mówi:

def krotnie (zeroValue T) (PO: (T, T) ⇒ T) T zbiorczej elementy każdej partycji, a następnie wyniki dla wszystkich przegród, przy użyciu podane asocjacyjne funkcja i neutralną „wartość zero”

Zauważ, że nie ma przemienne potrzebne w dok. Jednak wynik nie jest zgodnie z oczekiwaniami:

scala> rdd.fold(""){ _+_ } 
res10: String = 312456879 

EDIT Próbowałem jak wspomniano przez @ dk14, bez powodzenia:

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res22: String = 341276895 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res23: String = 914856273 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res24: String = 742539618 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res25: String = 271468359 
+0

Tęskniłeś za następną sekcją dokumentów, która opisuje to, co widzisz: * "To działa nieco inaczej niż operacje składane zaimplementowane dla nieproszonych zbiorów w językach funkcjonalnych, takich jak Scala. partycje pojedynczo, a następnie zawijać te wyniki w końcowy wynik, zamiast stosować fałdę do każdego elementu sekwencyjnie w określonym porządku. W przypadku funkcji, które nie są przemienne, wynik może różnić się od wyniku fałdu zastosowanego do kolekcji niepodzielonej "* –

Odpowiedz

2

Nie ma wbudowaną redukcję działania, który spełnia te kryteria w Scala, ale można łatwo zaimplementować własną łącząc mapPartitions, collect i lokalne redukcje :

import scala.reflect.ClassTag 

def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = { 
    rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f) 
} 

Wykorzystanie do scalania, a nie metody asynchronicznego i nieuporządkowanym używany przez fold kombinacji collect i reduce zapewnia globalny rozkaz jest zachowana.

To oczywiście pochodzi z jakiegoś dodatkowych kosztów, w tym:

  • nieco wyższe zużycie pamięci w sterowniku.
  • znacznie większe opóźnienie - wyraźnie czekamy na zakończenie wszystkich zadań przed rozpoczęciem lokalnej redukcji.
+0

Dziękuję za twoją pomoc, czy to oznacza, że ​​każda partycja ** jest zawsze ciągłą podsekcją ** całego RDD? Czy jest jakiś wzmiankowany dokument? – Eastsun

+0

Jeśli chodzi o dokumenty - nie jestem tego świadomy. Jest on jednak mniej lub bardziej ograniczony przez model i kontrakty niektórych uporządkowanych metod. Prawdziwym problemem w Sparku jest ustalenie ogólnej sekwencji. Zasadniczo istnieją dwa przypadki, gdy powód bierze pod uwagę kolejność a) w przypadku korzystania z sortowania jawnego (według kontraktu) b), gdy użytkownik posiada dane wejściowe generujące deterministyczne uporządkowane podziały i nie ma tasowania i innych przesunięć danych między wejściem a bieżącym punktem. – zero323

1

Jak podkreślił @YuvalItzchakov fold nie zachowuje kolejność w partycjonowanym RDD podczas łączenia wyników. Aby zilustrować to rozważyć koalescencyjny oryginalny RDD do jednego tylko partycję,

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)).coalesce(1) 
rdd: org.apache.spark.rdd.RDD[String] = CoalescedRDD[27] at coalesce at <console>:27 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res4: String = 123456789 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res5: String = 123456789 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res6: String = 123456789 
+0

Należy zauważyć, że zrobienie tego będzie miało tę wadę, że całkowicie straci zdolność równoległości obliczeń. –

+0

@YuvalItzchakov określony; z 'fold', zamawianie może nie zostać zachowane w partycjonowanym' RDD'. – elm

+0

Tak, rozumiem. Ale PO powinien być tego świadomy. –