2016-02-09 27 views
10

Dlaczego dopasowywanie wzorców w Spark nie działa tak samo jak w Scali? Zobacz przykład poniżej ... funkcja f() próbuje dopasować do wzorca na klasie, która działa w Scala REPL, ale kończy się niepowodzeniem w Sparku i daje w wyniku wszystkie "???". f2() to obejście, które uzyskuje pożądany wynik w Sparku przy użyciu .isInstanceOf(), ale rozumiem, że jest to zła forma w Scali.Równość klasy Case w Apache Spark

Każda pomoc dotycząca wzoru pasującego do prawidłowego sposobu w tym scenariuszu w Sparku byłaby bardzo doceniana.

abstract class a extends Serializable {val a: Int} 
case class b(a: Int) extends a 
case class bNull(a: Int=0) extends a 

val x: List[a] = List(b(0), b(1), bNull()) 
val xRdd = sc.parallelize(x) 

próba dopasowania wzorca, który działa w Scala REPL ale nie w Spark

def f(x: a) = x match { 
    case b(n) => "b" 
    case bNull(n) => "bnull" 
    case _ => "???" 
} 

obejścia, który funkcjonuje w Spark, ale jest zła forma (chyba)

def f2(x: a) = { 
    if (x.isInstanceOf[b]) { 
     "b" 
    } else if (x.isInstanceOf[bNull]) { 
     "bnull" 
    } else { 
     "???" 
    } 
} 

Zobacz wyniki

xRdd.map(f).collect     //does not work in Spark 
             // result: Array("???", "???", "???") 
xRdd.map(f2).collect     // works in Spark 
             // resut: Array("b", "b", "bnull") 
x.map(f(_))       // works in Scala REPL  
             // result: List("b", "b", "bnull") 

Wersje stosowane ... Wyniki zapłonowe prowadzone w zapłonowej-shell (Spark 1.6 na AWS EMR-4.3) Scala REPL w SBT 0.13.9 2.10.5 (Scala)

Odpowiedz

15

Jest to znany problem z Spark REPL. Więcej informacji znajdziesz w SPARK-2620. Wpływa na wiele operacji w Spark REPL, w tym większość transformacji na PairwiseRDDs. Na przykład:

case class Foo(x: Int) 

val foos = Seq(Foo(1), Foo(1), Foo(2), Foo(2)) 
foos.distinct.size 
// Int = 2 

val foosRdd = sc.parallelize(foos, 4) 
foosRdd.distinct.count 
// Long = 4 

foosRdd.map((_, 1)).reduceByKey(_ + _).collect 
// Array[(Foo, Int)] = Array((Foo(1),1), (Foo(1),1), (Foo(2),1), (Foo(2),1)) 

foosRdd.first == foos.head 
// Boolean = false 

Foo.unapply(foosRdd.first) == Foo.unapply(foos.head) 
// Boolean = true 

Co czyni go jeszcze gorsze jest to, że rezultaty zależą od rozkładu danych:

sc.parallelize(foos, 1).distinct.count 
// Long = 2 

sc.parallelize(foos, 1).map((_, 1)).reduceByKey(_ + _).collect 
// Array[(Foo, Int)] = Array((Foo(2),2), (Foo(1),2)) 

Najprostszą rzeczą, jaką możesz zrobić, to zdefiniować i pakiet wymaganych klas przypadków poza REPL. Każdy kod przesłany bezpośrednio przy użyciu spark-submit powinien również działać.

W Scali 2.11+ możesz utworzyć pakiet bezpośrednio w REPL pod numerem paste -raw.

scala> :paste -raw 
// Entering paste mode (ctrl-D to finish) 

package bar 

case class Bar(x: Int) 


// Exiting paste mode, now interpreting. 

scala> import bar.Bar 
import bar.Bar 

scala> sc.parallelize(Seq(Bar(1), Bar(1), Bar(2), Bar(2))).distinct.collect 
res1: Array[bar.Bar] = Array(Bar(1), Bar(2)) 
+0

Dzięki zero323! Widzę wzmianki o dopasowywaniu wzorów, które nie działają w powłoce iskrzenia, ale nie ma szczegółów ... mówisz, że jeśli zdefiniuję moje klasy przypadków w słoiku, to będę w stanie dopasować do nich wzór w REPL? Dzięki jeszcze raz! – kmh

+1

Dokładnie. Zdefiniuj zewnętrzne, zbuduj słoik, dołącz do 'CLASSPATH' i importuj. – zero323

+0

Idealny! Dzięki jeszcze raz! – kmh