2017-01-27 52 views
10

Próbuję wykonać wiele operacji w jednym wierszu kodu w pySpark, i nie jestem pewien, czy jest to możliwe w moim przypadku.funkcja agregacji Zliczanie użycia z groupBy w Spark

Moim zamiarem nie jest zapisywanie wyniku jako nowej ramki danych.

Mój obecny kod jest dość prosty:

encodeUDF = udf(encode_time, StringType()) 
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME'))) 
    .groupBy('timePeriod') 
    .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"), 
    stddev('DOWNSTREAM_SIZE').alias("Stddev") 
) 
    .show(20, False) 

A moim zamiarem jest dodanie count() po użyciu groupBy, aby uzyskać dobrze, liczba rekordów pasujących do każdego wartość timePeriod kolumny, drukowane \ pokazane jako wynik.

Podczas próby użycia groupBy(..).count().agg(..) otrzymuję wyjątki.

jest jakiś sposób na osiągnięcie zarówno count() i agg() .pokaż() odbitki bez kodu rozdzielającego dwie linie rozkazu, np :

new_log_df.withColumn(..).groupBy(..).count() 
new_log_df.withColumn(..).groupBy(..).agg(..).show() 

Albo jeszcze lepiej, bo coraz wyjście połączyły agg.show() wyjścia - dodatkową kolumnę w którym stwierdza się liczono liczbę rekordów pasujących wartości w wierszu. np .:

timePeriod | Mean | Stddev | Num Of Records 
    X  | 10 | 20 | 315 
+0

'new_log_df.withColumn (..). GroupBy (..). Agg (count (1)) show()'? – mrsrinivas

+0

Co oznacza '1' w' count (1) '? i czy mogę użyć count() wewnątrz agg() wraz z innymi terminami wymienionymi w moim kodzie? – Adiel

+0

Podczas próby użycia 'agg (count (1), mean (..), stddev (..)). Show()' otrzymuję ** NameError: name 'count' nie jest zdefiniowana ** – Adiel

Odpowiedz

18

count() mogą być wykorzystywane wewnątrz agg() jako groupBy ekspresji są takie same.

Pythona

import pyspark.sql.functions as func 

new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"])) 
    .groupBy("timePeriod") 
    .agg(
    func.mean("DOWNSTREAM_SIZE").alias("Mean"), 
    func.stddev("DOWNSTREAM_SIZE").alias("Stddev"), 
    func.count(func.lit(1)).alias("Num Of Records") 
    ) 
    .show(20, False) 

pySpark SQL functions doc

Z Scala

import org.apache.spark.sql.functions._ //for count() 

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
    .groupBy("timePeriod") 
    .agg(
    mean("DOWNSTREAM_SIZE").alias("Mean"), 
    stddev("DOWNSTREAM_SIZE").alias("Stddev"), 
    count(lit(1)).alias("Num Of Records") 
    ) 
    .show(20, false) 

count(1) liczą rekordy autorem pierwszej kolumny, która jest równa count("timePeriod")

Z Java

import static org.apache.spark.sql.functions.*; 

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
    .groupBy("timePeriod") 
    .agg(
    mean("DOWNSTREAM_SIZE").alias("Mean"), 
    stddev("DOWNSTREAM_SIZE").alias("Stddev"), 
    count(lit(1)).alias("Num Of Records") 
    ) 
    .show(20, false)