2016-03-26 24 views
9

Tak, mam DataFrame w Spark, który wygląda tak:Konwersja Dataframe na mapie (klucz-wartość) w Spark

Ma 30 kolumn: tylko pokazano niektóre z nich!

[ABCD,color,NORMAL,N,2015-02-20,1] 
[XYZA,color,NORMAL,N,2015-05-04,1] 
[GFFD,color,NORMAL,N,2015-07-03,1] 
[NAAS,color,NORMAL,N,2015-08-26,1] 
[LOWW,color,NORMAL,N,2015-09-26,1] 
[KARA,color,NORMAL,N,2015-11-08,1] 
[ALEQ,color,NORMAL,N,2015-12-04,1] 
[VDDE,size,NORMAL,N,2015-12-23,1] 
[QWER,color,NORMAL,N,2016-01-18,1] 
[KDSS,color,NORMAL,Y,2015-08-29,1] 
[KSDS,color,NORMAL,Y,2015-08-29,1] 
[ADSS,color,NORMAL,Y,2015-08-29,1] 
[BDSS,runn,NORMAL,Y,2015-08-29,1] 
[EDSS,color,NORMAL,Y,2015-08-29,1] 

Więc muszę konwertować ten dataFrame na klucz-wartość Pair w Scala, używając klucza, jak niektóre z kolumn w Dataframe i przypisywanie unikatowych wartości do tych kluczy z indeksem 0 do count (odrębną liczba kluczy).

Na przykład: przy użyciu powyższym przypadku, chcę mieć wyjście w mapę (klucz-wartość) w kolekcji Scala takiego:

([ABC_color_NORMAL_N_1->0] 
    [XYZA_color_NORMAL_N_1->1] 
    [GFFD_color_NORMAL_N_1->2] 
    [NAAS_color_NORMAL_N_1->3] 
    [LOWW_color_NORMAL_N_1->4] 
    [KARA_color_NORMAL_N_1->5] 
    [ALEQ_color_NORMAL_N_1->6] 
    [VDDE_size_NORMAL_N_1->7] 
    [QWER_color_NORMAL_N_1->8] 
    [KDSS_color_NORMAL_Y_1->9] 
    [KSDS_color_NORMAL_Y_1->10] 
    [ADSS_color_NORMAL_Y_1->11] 
    [BDSS_runn_NORMAL_Y_1->12] 
    [EDSS_color_NORMAL_Y_1->13] 
    ) 

Jestem nowy Scala i iskra i starałem robienie czegoś takiego.

var map: Map[String, Int] = Map() 
    var i = 0 
    dataframe.foreach(record =>{ 
    //Is there a better way of creating a key! 
     val key = record(0) + record(1) + record(2) + record(3) 
     var index = i 
     map += (key -> index) 
     i+=1 
      } 
     ) 

Ale to nie działa .:/Mapa jest pusta po zakończeniu.

Odpowiedz

9

Głównym problemem w kodzie próbuje modyfikować zmienną utworzoną na stronie kierowcy w ramach kodu wykonywanego na pracowników. Używając Sparka, możesz używać zmiennych po stronie sterownika w transformacjach RDD tylko jako wartości "tylko do odczytu".

Konkretnie:

  • Mapa jest tworzona na maszynie kierowcy
  • mapie (z początkowej, pustej wartości) jest odcinkach i wysyłane do węzłów robotnic
  • Każdy węzeł może zmienić map (lokalnie)
  • Wynik jest wyrzucany po wykonaniu foreach - wynikiem jest , a nie wysłany do sterownika.

Aby rozwiązać ten problem - należy wybrać transformację, która zwraca zmieniony RDD (np map), aby utworzyć klucze, użyj zipWithIndex dodać Running „identyfikatory”, a następnie użyj collectAsMap uzyskać wszystkie dane z powrotem do kierowca jako mapę:

val result: Map[String, Long] = dataframe 
    .map(record => record(0) + record(1) + record(2) + record(3)) 
    .zipWithIndex() 
    .collectAsMap() 

co do samego stworzenia klucza - zakładając, że chcesz dołączyć pierwszych 5 kolumn i dodać separator (_) między nimi, można użyć:

record => record.toList.take(5).mkString("_") 
+0

Dzięki za wspaniałą odpowiedź! Jestem nowy w iskrze/scala i próbuję zrobić to samo w moim kodzie, jedyną różnicą jest to, że mam dwie kolumny w mojej Dataframe i próbuję zrobić to z mapą z jedną kolumną będącą kluczem i drugą jako wartość. np: kolumna1, kolumna2 => Mapa ("kolumna1" -> "kolumna2", "kolumna1" -> "kolumna2", ....). Czy jest jakiś sposób na zrobienie tego? Każda pomoc będzie doceniona. –

+0

zobacz "org.apache.spark.sql.functions.map", aby utworzyć kolumnę z mapą –

+0

dzięki za odpowiedź! Czy jest dostępny w wersji iskry 1.6? To jest wersja, której używam .. Próbowałem ją zaimportować, ale wygląda na to, że nie jest dostępna :(Próbowałem również w taki sam sposób jak podano w wysłanym pytaniu, ale daje mi ten błąd 'java.lang.NoSuchMethodError: scala.Predef $ .ArrowAssoc (Ljava/lang/Object;) Ljava/lang/Object; 'na linii gdzie próbuję wziąć drugi element kolumny jako ciąg .. –