2016-08-02 25 views
5

Mam do czynienia z kolumną liczb w dużej iskrze DataFrame, i chciałbym utworzyć nową kolumnę, która przechowuje zagregowaną listę unikalnych liczb, które pojawiają się w tej kolumnie.Czy istnieje sposób przekazywania parametru ograniczenia do functions.collect_set w Spark?

Zasadniczo dokładnie to, co robi function.collect_set. Jednak na zbiorczej liście potrzebuję tylko 1000 elementów. Czy istnieje jakiś sposób przekazania tego parametru w jakiś sposób do functions.collect_set(), lub w inny sposób, aby uzyskać tylko do 1000 elementów na liście zbiorczej, bez korzystania z UDAF?

Ponieważ kolumna jest tak duża, chciałabym uniknąć kolekcjonowania wszystkich elementów i późniejszego przycinania listy.

Dzięki!

Odpowiedz

1

użycie wziąć

val firstThousand = rdd.take(1000) 

zwróci pierwszy 1000. Collect posiada również funkcję filtrowania, które mogą być świadczone. To pozwoliłoby ci dokładniej określić, co jest zwracane.

+0

Dzięki za odpowiedź. Jednakże, 1) Chciałbym tylko listę wartości _distinct_. Widzę, że istnieje rdd.distinct(), ale nie ma on parametru ograniczenia 2) Nie wiem, jak użyć funkcji filtru w zbiorze. Jak użyć filtru, aby uzyskać tylko określoną liczbę wartości? – user1500142

+0

Również, najlepiej, chciałbym uniknąć używania rdds. Jestem obecnie coś jak df.groupBy(). Agg ( user1500142

1
scala> df.show 
    +---+-----+----+--------+ 
    | C0| C1| C2|  C3| 
    +---+-----+----+--------+ 
    | 10| Name|2016| Country| 
    | 11|Name1|2016|country1| 
    | 10| Name|2016| Country| 
    | 10| Name|2016| Country| 
    | 12|Name2|2017|Country2| 
    +---+-----+----+--------+ 

scala> df.groupBy("C1").agg(sum("C0")) 
res36: org.apache.spark.sql.DataFrame = [C1: string, sum(C0): bigint] 

scala> res36.show 
+-----+-------+ 
| C1|sum(C0)| 
+-----+-------+ 
|Name1|  11| 
|Name2|  12| 
| Name|  30| 
+-----+-------+ 

scala> df.limit(2).groupBy("C1").agg(sum("C0")) 
    res33: org.apache.spark.sql.DataFrame = [C1: string, sum(C0): bigint] 

    scala> res33.show 
    +-----+-------+ 
    | C1|sum(C0)| 
    +-----+-------+ 
    | Name|  10| 
    |Name1|  11| 
    +-----+-------+ 



    scala> df.groupBy("C1").agg(sum("C0")).limit(2) 
res2: org.apache.spark.sql.DataFrame = [C1: string, sum(C0): bigint] 

scala> res2.show 
+-----+-------+ 
| C1|sum(C0)| 
+-----+-------+ 
|Name1|  11| 
|Name2|  12| 
+-----+-------+ 

scala> df.distinct 
res8: org.apache.spark.sql.DataFrame = [C0: int, C1: string, C2: int, C3: string] 

scala> res8.show 
+---+-----+----+--------+ 
| C0| C1| C2|  C3| 
+---+-----+----+--------+ 
| 11|Name1|2016|country1| 
| 10| Name|2016| Country| 
| 12|Name2|2017|Country2| 
+---+-----+----+--------+ 

scala> df.dropDuplicates(Array("c1")) 
res11: org.apache.spark.sql.DataFrame = [C0: int, C1: string, C2: int, C3: string] 

scala> res11.show 
+---+-----+----+--------+              
| C0| C1| C2|  C3| 
+---+-----+----+--------+ 
| 11|Name1|2016|country1| 
| 12|Name2|2017|Country2| 
| 10| Name|2016| Country| 
+---+-----+----+--------+ 
+0

Dzięki za odpowiedź, ale to nie całkiem robi to, co chcę. Jeśli chcę mieć do 1000 różnych wartości z kolumny, "df.limit (1000)" umieści twardą górną granicę na liczbie zwróconych wartości, ale mogę tracić różne wartości, które powinienem dodawać w inny sposób. – user1500142

+0

masz dwie metody odrębne i dropDuplicates, które można wykonać przed limitem, groupby i agg metod. W odróżnieniu od wszystkich kolumn i droDuplikatesów można kontrolować, które kolumny porównać, aby zidentyfikować duplikaty. @ user1500142 – mark

2

Używam zmodyfikowanej kopii funkcji collect_set i collect_list; ze względu na zakresy kodów zmodyfikowane kopie muszą znajdować się w tej samej ścieżce pakietu, co oryginały. Połączony kod działa dla Sparka 2.1.0; jeśli używasz poprzedniej wersji, sygnatury metody mogą być inne.

Rzut ten plik (https://gist.github.com/lokkju/06323e88746c85b2ce4de3ea9cdef9bc) do projektu jako src/main/org/apache/iskra/sql/katalizator/wypowiedzi/collect_limit.scala

używać go jako:

import org.apache.spark.sql.catalyst.expression.collect_limit._ 
df.groupBy('set_col).agg(collect_set_limit('set_col,1000) 
3

moje rozwiązanie jest bardzo podobny do Loki's answer with collect_set_limit.


użyję UDF, że zrobi to, co chcesz po collect_set (lub collect_list) lub znacznie trudniej UDAF.

Biorąc pod uwagę więcej doświadczeń z UDF, poszedłbym z tym pierwszy. Mimo że UDF nie są zoptymalizowane, w tym przypadku jest to w porządku.

val limitUDF = udf { (nums: Seq[Long], limit: Int) => nums.take(limit) } 
val sample = spark.range(50).withColumn("key", $"id" % 5) 

scala> sample.groupBy("key").agg(collect_set("id") as "all").show(false) 
+---+--------------------------------------+ 
|key|all         | 
+---+--------------------------------------+ 
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]| 
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]| 
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]| 
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]| 
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]| 
+---+--------------------------------------+ 

scala> sample. 
    groupBy("key"). 
    agg(collect_set("id") as "all"). 
    withColumn("limit(3)", limitUDF($"all", lit(3))). 
    show(false) 
+---+--------------------------------------+------------+ 
|key|all         |limit(3) | 
+---+--------------------------------------+------------+ 
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] | 
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] | 
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]| 
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]| 
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] | 
+---+--------------------------------------+------------+ 

Zobacz functions obiekt (na docs udf funkcyjnego).