Próbuję przeprowadzić analizę na zestawach. Mam przykładowy zestaw danych, który wygląda tak:Spark grupowanie danych grupowych na listę
orders.json
{"items":[1,2,3,4,5]}
{"items":[1,2,5]}
{"items":[1,3,5]}
{"items":[3,4,5]}
Wszystko to jest, to pojedyncze pole, które znajduje się lista numerów, które reprezentują identyfikatory.
Oto skrypt Spark próbuję uruchomić:
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("Dataframe Test")
val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)
val dataframe = sql.read.json("orders.json")
val expanded = dataframe
.explode[::[Long], Long]("items", "item1")(row => row)
.explode[::[Long], Long]("items", "item2")(row => row)
val grouped = expanded
.where(expanded("item1") !== expanded("item2"))
.groupBy("item1", "item2")
.count()
val recs = grouped
.groupBy("item1")
Tworzenie expanded
i grouped
jest w porządku, w skrócie expanded
znajduje się lista wszystkich możliwych zestawów dwóch identyfikatorów, gdzie dwa identyfikatory były w ten sam oryginalny zestaw. grouped
filtruje identyfikatory, które zostały dopasowane do siebie, a następnie grupuje wszystkie unikalne pary identyfikatorów i generuje liczbę dla każdego. Schemat i dane próbka grouped
są:
root
|-- item1: long (nullable = true)
|-- item2: long (nullable = true)
|-- count: long (nullable = false)
[1,2,2]
[1,3,2]
[1,4,1]
[1,5,3]
[2,1,2]
[2,3,1]
[2,4,1]
[2,5,2]
...
Więc moje pytanie brzmi: w jaki sposób mogę teraz grupa na pierwszej pozycji w każdym związku, tak że mam listę krotek? Na przykład dane powyżej, chciałbym się spodziewać czegoś podobnego do tego:
[1, [(2, 2), (3, 2), (4, 1), (5, 3)]]
[2, [(1, 2), (3, 1), (4, 1), (5, 2)]]
Jak widać w moim skrypcie z recs
, myślałem, że zaczniesz wykonując GroupBy na „item1”, który jest pierwszym elementem w każdy rząd. Ale po tym jesteś z tym obiektem GroupedData, który ma bardzo ograniczone działania na nim. Naprawdę, pozostaje ci tylko robić agregacje, takie jak suma, śr. Itp. Chcę tylko wyświetlić listę krotek z każdego wyniku.
Mogę łatwo korzystać z funkcji RDD w tym momencie, ale to odbiega od korzystania z Dataframes. Czy istnieje sposób, aby to zrobić z funkcjami ramek danych.