ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB)
Cel: Uzyskanie rekomendacji dla wszystkich użytkowników za pomocą modelu i pokrywają się z danymi testu każdego użytkownika i generują współczynnik nakładania się.
Mam zbudować model rekomendacji za pomocą iskry mllib. Oceniam relację nakładania się danych testowych na użytkownika i zalecane pozycje na użytkownika i generuję średni współczynnik nakładania się.
def overlapRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {
val testData: RDD[(Int, Iterable[Int])] = test_data.map(r => (r.user, r.product)).groupByKey
val n = testData.count
val recommendations: RDD[(Int, Array[Int])] = model.recommendProductsForUsers(20)
.mapValues(_.map(r => r.product))
val overlaps = testData.join(recommendations).map(x => {
val moviesPerUserInRecs = x._2._2.toSet
val moviesPerUserInTest = x._2._1.toSet
val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest)
if(localHitRatio.size > 0)
1
else
0
}).filter(x => x != 0).count
var r = 0.0
if (overlaps != 0)
r = overlaps/n
return r
}
Ale tu problem jest to, że kończy się wyrzuceniem powyżej maxResultSize
błędu. W mojej konfiguracji iskier podążyłem, aby powiększyć maxResultSize
.
val conf = new SparkConf()
conf.set("spark.driver.maxResultSize", "6g")
Ale to nie rozwiąże problemu, zbliżyłem się prawie do kwoty, którą przydzielam pamięci sterownika, ale problem nie został rozwiązany. Podczas gdy kod jest wykonywany, nie odrywałem wzroku od mojej iskierki i to, co zobaczyłem, jest nieco zagadkowe.
[Stage 281:==> (47807 + 100)/1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB)
W powyższym kodzie etapem jest wykonanie kodu MatrixFactorization w zapłonowej-mllib recommendForAll
wokół line 277
(nie dokładnie upewnić się, że numer linii).
private def recommendForAll(
rank: Int,
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(rank, srcFeatures)
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val ratings = srcFactors.transpose.multiply(dstFactors)
val output = new Array[(Int, (Int, Double))](m * n)
var k = 0
ratings.foreachActive { (i, j, r) =>
output(k) = (srcIds(i), (dstIds(j), r))
k += 1
}
output.toSeq
}
ratings.topByKey(num)(Ordering.by(_._2))
}
recommendForAll
metoda nazywa się z recommendProductsForUsers
metody.
Ale wygląda na to, że metoda polega na rezygnacji z zadań 1M. Dane, które pobierają, pochodzą z 2000 plików części, więc nie wiem, jak zaczęły pluć zadania 1M i myślę, że to może być problem.
Moje pytanie brzmi: jak mogę faktycznie rozwiązać ten problem. Bez użycia tego podejścia bardzo ciężko obliczyć overlap ratio
lub [email protected]
. To jest iskra 1,5 (Cloudera 5.5)