2016-10-29 18 views
6

Używam Spark 1.6.1 i napotykam dziwne zachowanie: Używam UDF z ciężkimi obliczeniami (symulacje fizyki) na ramie zawierającej dane niektóre dane wejściowe i budowanie wyników - Dataframe zawierające wiele kolumn (~ 40).Spark UDF wywoływany więcej niż jeden raz na rekord, gdy DF ma zbyt wiele kolumn

Co dziwne, mój UDF jest wywoływany więcej niż raz na rekord mojej wejściowej ramki danych w tym przypadku (1,6 razy częściej), co uważam za niedopuszczalne, ponieważ jest bardzo drogie. Jeśli zmniejszę liczbę kolumn (na przykład do 20), to zachowanie zniknie.

udało mi się zanotować niewielki skrypt, który demonstruje w ten sposób:

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.functions.udf 


object Demo { 

    case class Result(a: Double) 

    def main(args: Array[String]): Unit = { 

    val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[*]")) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    val numRuns = sc.accumulator(0) // to count the number of udf calls 

    val myUdf = udf((i:Int) => {numRuns.add(1);Result(i.toDouble)}) 

    val data = sc.parallelize((1 to 100), numSlices = 5).toDF("id") 

    // get results of UDF 
    var results = data 
     .withColumn("tmp", myUdf($"id")) 
     .withColumn("result", $"tmp.a") 


    // add many columns to dataframe (must depend on the UDF's result) 
    for (i <- 1 to 42) { 
     results=results.withColumn(s"col_$i",$"result") 
    } 

    // trigger action 
    val res = results.collect() 
    println(res.size) // prints 100 

    println(numRuns.value) // prints 160 

    } 
} 

Teraz, czy jest jakiś sposób aby rozwiązać ten problem bez zmniejszania liczby kolumn?

Odpowiedz

4

Nie mogę naprawdę wyjaśnić tego zachowania - ale oczywiście plan zapytania w jakiś sposób wybiera ścieżkę, w której niektóre rekordy są obliczane dwukrotnie. Oznacza to, że jeśli osiągniemy wynik pośredni (zaraz po zastosowaniu UDF), możemy być w stanie "zmusić" Spark, aby nie przeliczał UDF. I rzeczywiście, gdy buforowanie dodaje zachowuje się zgodnie z oczekiwaniami - UDF nazywa się dokładnie 100 razy:

// get results of UDF 
var results = data 
    .withColumn("tmp", myUdf($"id")) 
    .withColumn("result", $"tmp.a").cache() 

oczywiście buforowanie ma swoje własne koszty (pamięć ...), ale to może skończyć się korzystne w przypadku jeśli zapisuje wiele wywołań UDF.

+0

To faktycznie działa! Nadal czekam z zaakceptowaniem odpowiedzi, może ktoś ma wyczerpującą odpowiedź –

+0

Tak, też jestem ciekawy - idealnie OK, z tobą nie akceptuję :) –

4

Mieliśmy ten sam problem około rok temu i spędziliśmy dużo czasu, aż w końcu zorientowaliśmy się, na czym polegał problem.

Mieliśmy również bardzo kosztowny UDF do obliczenia i dowiedzieliśmy się, że oblicza się go ponownie za każdym razem, gdy odnosimy się do jego kolumny. Jej po prostu się do nas ponownie kilka dni temu, więc postanowiłem otworzyć błąd w tym: SPARK-18748

również otwarte pytanie tutaj wtedy, ale teraz widzę tytuł nie był tak dobry: Trying to turn a blob into multiple columns in Spark

Zgadzam się z Tzach o "forsowaniu" planu obliczania UDF. Zrobiliśmy to brzydsze, ale musieliśmy, ponieważ nie mogliśmy cache() dane - był zbyt duży:

val df = data.withColumn("tmp", myUdf($"id")) 
val results = sqlContext.createDataFrame(df.rdd, df.schema) 
      .withColumn("result", $"tmp.a") 

zmiana:

Teraz widzę, że mój bilet JIRA był związany z inną jeden: SPARK-17728, który nadal nie poradził sobie z tą kwestią we właściwy sposób, ale daje jeszcze jedną dodatkową pracę:

val results = data.withColumn("tmp", explode(array(myUdf($"id")))) 
        .withColumn("result", $"tmp.a") 
+0

dzięki za udostępnienie! – twoface88