Używam Spark 1.6.1 i napotykam dziwne zachowanie: Używam UDF z ciężkimi obliczeniami (symulacje fizyki) na ramie zawierającej dane niektóre dane wejściowe i budowanie wyników - Dataframe zawierające wiele kolumn (~ 40).Spark UDF wywoływany więcej niż jeden raz na rekord, gdy DF ma zbyt wiele kolumn
Co dziwne, mój UDF jest wywoływany więcej niż raz na rekord mojej wejściowej ramki danych w tym przypadku (1,6 razy częściej), co uważam za niedopuszczalne, ponieważ jest bardzo drogie. Jeśli zmniejszę liczbę kolumn (na przykład do 20), to zachowanie zniknie.
udało mi się zanotować niewielki skrypt, który demonstruje w ten sposób:
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf
object Demo {
case class Result(a: Double)
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[*]"))
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val numRuns = sc.accumulator(0) // to count the number of udf calls
val myUdf = udf((i:Int) => {numRuns.add(1);Result(i.toDouble)})
val data = sc.parallelize((1 to 100), numSlices = 5).toDF("id")
// get results of UDF
var results = data
.withColumn("tmp", myUdf($"id"))
.withColumn("result", $"tmp.a")
// add many columns to dataframe (must depend on the UDF's result)
for (i <- 1 to 42) {
results=results.withColumn(s"col_$i",$"result")
}
// trigger action
val res = results.collect()
println(res.size) // prints 100
println(numRuns.value) // prints 160
}
}
Teraz, czy jest jakiś sposób aby rozwiązać ten problem bez zmniejszania liczby kolumn?
To faktycznie działa! Nadal czekam z zaakceptowaniem odpowiedzi, może ktoś ma wyczerpującą odpowiedź –
Tak, też jestem ciekawy - idealnie OK, z tobą nie akceptuję :) –