2016-11-30 34 views
10

Mam strukturę RDDSpark: RDD do listy

RDD[(String, String)] 

i chcę utworzyć 2 listy (po jednym dla każdego wymiaru RDD).

Próbowałem użyć rdd.foreach() i wypełnić dwa ListBuffers, a następnie przekonwertować je na listy, ale przypuszczam, że każdy węzeł tworzy swój własny ListBuffer, ponieważ po iteracji lista BufferLists jest pusta. Jak mogę to zrobić ?

EDIT: moje podejście

val labeled = data_labeled.map { line => 
    val parts = line.split(',') 
    (parts(5), parts(7)) 
}.cache() 

var testList : ListBuffer[String] = new ListBuffer() 

labeled.foreach(line => 
    testList += line._1 
) 
    val labeledList = testList.toList 
    println("rdd: " + labeled.count) 
    println("bufferList: " + testList.size) 
    println("list: " + labeledList.size) 

a wynik jest:

rdd: 31990654 
bufferList: 0 
list: 0 
+1

Proszę zaktualizować z kodem, czego próbowałem i niektóre dane wejściowe próbki i oczekiwany wynik! Twoje pytanie nie jest dla mnie jasne. – eliasah

Odpowiedz

9

Jeśli naprawdę chcesz, aby utworzyć dwa Listy - znaczenie, chcesz wszystkie rozproszone dane mają być gromadzone na aplikacja sterownika (ryzyko spowolnienia lub OutOfMemoryError) - można użyć collect, a następnie użyć prostych operacji map na wyniku:

val list: List[(String, String)] = rdd.collect().toList 
val col1: List[String] = list.map(_._1) 
val col2: List[String] = list.map(_._2) 

Alternatywnie - jeśli chcesz „split” Twój RDD na dwie RDD - to całkiem podobny bez zbierania danych:

rdd.cache() // to make sure calculation of rdd is not repeated twice 
val rdd1: RDD[String] = rdd.map(_._1) 
val rdd2: RDD[String] = rdd.map(_._2) 

Trzecią alternatywą jest do pierwszej mapy do tych dwóch RDD i następnie zbieraj każdego z nich, ale nie różni się to zbytnio od pierwszej opcji i cierpi z powodu tego samego ryzyka i ograniczeń.

+0

@ Yuriy Jaka jest tu zmienna broadcast (która jest tylko do odczytu)? Czy możesz opisać to bardziej? – avr

+0

@avr ListBuffer jest mutable i '+ =' mutate inner state, nie twórz nowych referencji. Ale twoje pytanie jest dobre, a dla niezmiennego stwierdzenia (gdzie odniesienie zmienia się dla jakiejkolwiek operacji) trzeba go owinąć czymś (Serializable). Prosty przykład dla listy: 'val testList = sc.broadcast (nowy Serializable {var list = List.empty [String]})' i po zmutowaniu stanu wewnętrznego. – Yuriy

+0

@ Yuriy Myślę, że avr ma rację i źle zrozumiałeś jego/jej pytanie - nie jest to kwestia mutable versus immutable collection - zmienne broadcast są _read only_ w tym sensie, że jeśli ich wartości są zmieniane na executorze, kod kierowcy wygrał ' t zobacz tę zmianę (w jaki sposób Spark będzie agregował zmiany wprowadzone przez wszystkie executory?). Fakt, że działa to w trybie lokalnym wygląda jak błąd, nie zadziała tam, gdzie klaster jest faktycznie dystrybuowany. –

1

Jako alternatywę do odpowiedzi Tzach Zohar, można użyć unzip na listach:

scala> val myRDD = sc.parallelize(Seq(("a", "b"), ("c", "d"))) 
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27 

scala> val (l1, l2) = myRDD.collect.toList.unzip 
l1: List[String] = List(a, c) 
l2: List[String] = List(b, d) 

Albo keys i values na RDD s:

scala> val (rdd1, rdd2) = (myRDD.keys, myRDD.values) 
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at keys at <console>:33 
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at values at <console>:33 

scala> rdd1.foreach{println} 
a 
c 

scala> rdd2.foreach{println} 
d 
b