Próbuję porównać różne sposoby agregowania moich danych.Spark: Jak tłumaczyć count (odrębny (wartość)) w API Dataframe API
To moje dane wejściowe z 2 elementów (strona, gość):
(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)
Praca z polecenia SQL w Spark SQL z tym kodem:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
"""select page
,count(distinct visitor) as visitor
from logs
group by page
""")
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)
uzyskać ten wynik:
(PAG1,3) // PAG1 has been visited by 3 different visitors
(PAG2,2) // PAG2 has been visited by 2 different visitors
Teraz chciałbym uzyskać ten sam wynik przy użyciu Dataframes i interfejsu API, ale nie mogę uzyskać tego samego Wyjście:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Coppia(p._1,p._2)).toDF()
val result = log.select("page","visitor").groupBy("page").count().distinct
result.foreach(println)
W rzeczywistości, to co mam na wyjściu:
[PAG1,8] // just the simple page count for every page
[PAG2,4]
To chyba coś głupi, ale nie widzę go teraz.
Z góry dziękuję!
FF
otrzymuję ten błąd -> Nie znaleziono: wartość countDistinct –
jest to metoda w 'org.apache.spark.sql .funkcje', importuj to :), edytuj gotowe. –
z IntelliJ Muszę napisać polecenie agg/countDistinct podobne do tego .agg (org.apache.spark.sql.functions.countDistinct ("visitor")), ponieważ nawet jeśli zaimportowałem org.apache.spark.sql. funkcje nadal daje mi ten sam błąd ... w każdym razie to działa, ale mam tylko kolumnę odwiedzającego i brak kolumny strony ([2], [3]) ... czego mi brakuje? –