2016-10-14 32 views
6

Jestem nowy zarówno Scala i Spark, więc mam nadzieję, że ktoś może dać mi znać gdzie idę źle tutaj.Spark „CodeGenerator: nie udało się skompilować” z Dataset.groupByKey

Mam zestaw danych trzy kolumny (id, nazwisko, rok) i chcę znaleźć w ostatnim roku, dla każdej nazwy. Innymi słowy:

BEFORE           AFTER 
| id_1 | name_1 | 2015 |      | id_2 | name_1 | 2016 | 
| id_2 | name_1 | 2016 |      | id_4 | name_2 | 2015 | 
| id_3 | name_1 | 2014 | 
| id_4 | name_2 | 2015 | 
| id_5 | name_2 | 2014 | 

Myślałem groupByKey i reduceGroups dostanie zadanie:

val latestYears = ds 
    .groupByKey(_.name) 
    .reduceGroups((left, right) => if (left.year > right.year) left else right) 
    .map(group => group._2) 

Ale daje ten błąd, i wypluwa dużo wygenerowanego kodu Java:

ERROR CodeGenerator: failed to compile: 
org.codehaus.commons.compiler.CompileException: 
File 'generated.java', Line 21, Column 101: Unknown variable or type "value4" 

Co ciekawe, jeśli utworzę zestaw danych tylko z kolumnami nazwa i rok, działa on zgodnie z oczekiwaniami.


Oto pełny kod biegnę:

object App { 

    case class Record(id: String, name: String, year: Int) 

    def main(args: Array[String]) { 
    val spark = SparkSession.builder().master("local").appName("test").getOrCreate() 
    import spark.implicits._ 

    val ds = spark.createDataset[String](Seq(
     "id_1,name_1,2015", 
     "id_2,name_1,2016", 
     "id_3,name_1,2014", 
     "id_4,name_2,2015", 
     "id_5,name_2,2014" 
    )) 
     .map(line => { 
     val fields = line.split(",") 
     new Record(fields(0), fields(1), fields(2).toInt) 
     }) 

    val latestYears = ds 
     .groupByKey(_.name) 
     .reduceGroups((left, right) => if (left.year > right.year) left else right) 
     .map(group => group._2) 

    latestYears.show() 
    } 


} 

EDIT: wierzę, może to być błąd z Spark v2.0.1. Po przejściu na wersję v2.0.0 ta sytuacja już nie występuje.

+0

Ten sam problem tutaj, obejrzałem ten problem, przekształcając metodę reduceGroups(). Map (_._ 2) na grupę mapGroups (_. Reduce (_._ 2)). Czy zgłosiłeś już ten problem do listy wysyłkowej/śledzenia problemów? –

+0

To może być błąd, ale bardziej dotyczy mnie samego kodu. Dlaczego nie używasz 'groupBy' i' max' w 'year'? Używa jednak nietypowego API DataFrame (nie Dataset). Jakiś konkretny powód? –

Odpowiedz

0

Twoje funkcje groupBy i reduceGroups to experimental. Dlaczego nie użyć reduceByKey (api)?

Plusy:

  • To powinno być łatwe do przetłumaczenia z kodu masz.
  • Jest bardziej stabilny (nie eksperymentalny).
  • To powinno być bardziej efektywne, ponieważ nie wymagają pełnego odtwarzania losowego wszystkich elementów w każdej z grup (które mogą także tworzyć sieć I/O powolny upadki i przepełnienie pamięci w węźle).