2015-12-02 7 views
7

błąd:SparkError: Całkowita wielkość odcinkach wyników zadań XXXX (2,0 GB) jest większy niż spark.driver.maxResultSize (2,0 GB)

ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB) 

Cel: Uzyskanie rekomendacji dla wszystkich użytkowników za pomocą modelu i pokrywają się z danymi testu każdego użytkownika i generują współczynnik nakładania się.

Mam zbudować model rekomendacji za pomocą iskry mllib. Oceniam relację nakładania się danych testowych na użytkownika i zalecane pozycje na użytkownika i generuję średni współczynnik nakładania się.

def overlapRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = { 

    val testData: RDD[(Int, Iterable[Int])] = test_data.map(r => (r.user, r.product)).groupByKey 
    val n = testData.count 

    val recommendations: RDD[(Int, Array[Int])] = model.recommendProductsForUsers(20) 
     .mapValues(_.map(r => r.product)) 

    val overlaps = testData.join(recommendations).map(x => { 
     val moviesPerUserInRecs = x._2._2.toSet 
     val moviesPerUserInTest = x._2._1.toSet 
     val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest) 
     if(localHitRatio.size > 0) 
     1 
     else 
     0 
    }).filter(x => x != 0).count 

    var r = 0.0 
    if (overlaps != 0) 
     r = overlaps/n 

    return r 

    } 

Ale tu problem jest to, że kończy się wyrzuceniem powyżej maxResultSize błędu. W mojej konfiguracji iskier podążyłem, aby powiększyć maxResultSize.

val conf = new SparkConf() 
conf.set("spark.driver.maxResultSize", "6g") 

Ale to nie rozwiąże problemu, zbliżyłem się prawie do kwoty, którą przydzielam pamięci sterownika, ale problem nie został rozwiązany. Podczas gdy kod jest wykonywany, nie odrywałem wzroku od mojej iskierki i to, co zobaczyłem, jest nieco zagadkowe.

[Stage 281:==> (47807 + 100)/1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB) 

W powyższym kodzie etapem jest wykonanie kodu MatrixFactorization w zapłonowej-mllib recommendForAll wokół line 277 (nie dokładnie upewnić się, że numer linii).

private def recommendForAll(
     rank: Int, 
     srcFeatures: RDD[(Int, Array[Double])], 
     dstFeatures: RDD[(Int, Array[Double])], 
     num: Int): RDD[(Int, Array[(Int, Double)])] = { 
    val srcBlocks = blockify(rank, srcFeatures) 
    val dstBlocks = blockify(rank, dstFeatures) 
    val ratings = srcBlocks.cartesian(dstBlocks).flatMap { 
     case ((srcIds, srcFactors), (dstIds, dstFactors)) => 
     val m = srcIds.length 
     val n = dstIds.length 
     val ratings = srcFactors.transpose.multiply(dstFactors) 
     val output = new Array[(Int, (Int, Double))](m * n) 
     var k = 0 
     ratings.foreachActive { (i, j, r) => 
      output(k) = (srcIds(i), (dstIds(j), r)) 
      k += 1 
     } 
     output.toSeq 
    } 
    ratings.topByKey(num)(Ordering.by(_._2)) 
    } 

recommendForAll metoda nazywa się z recommendProductsForUsers metody.

Ale wygląda na to, że metoda polega na rezygnacji z zadań 1M. Dane, które pobierają, pochodzą z 2000 plików części, więc nie wiem, jak zaczęły pluć zadania 1M i myślę, że to może być problem.

Moje pytanie brzmi: jak mogę faktycznie rozwiązać ten problem. Bez użycia tego podejścia bardzo ciężko obliczyć overlap ratio lub [email protected]. To jest iskra 1,5 (Cloudera 5.5)

Odpowiedz

0

problem 2GB nie jest niczym nowym dla społeczności Spark: https://issues.apache.org/jira/browse/SPARK-6235

RE/rozmiar partycji większej niż 2 GB, spróbuj ponownie podzielić na partycje (myRdd.repartition(parallelism)) swój RDD do większej liczby partycji (w/r/t/aktualny poziom równoległości), zmniejszając w ten sposób rozmiar każdej pojedynczej partycji.

Re/liczba zadań obróconych (stąd utworzonych partycji), moja hipoteza jest taka, że ​​może wyjść z połączenia API srcBlocks.cartesian(dstBlocks), które tworzy wyjściowy RDD z (z = srcBlocks liczbę partycji * dstBlocks liczbę partycji) partycje.

W takim przypadku można rozważyć zastosowanie interfejsu API myRdd.coalesce(parallelism) zamiast repartition, aby uniknąć tasowania (i problemów związanych z seralizacją partycji).