2017-08-21 56 views
5

Mam zestaw danych składający się z kolumny sygnatury czasowej i kolumny w dolarach. Chciałbym znaleźć średnią liczbę dolarów tygodniowo kończącą się znacznikiem czasu każdego rzędu. Początkowo patrzyłem na funkcję pyspark.sql.functions.window, ale te dane są wymieniane na tydzień.pyspark: średnia krocząca przy użyciu danych timeseries

Oto przykład:

%pyspark 
import datetime 
from pyspark.sql import functions as F 

df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"]) 
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp')) 

w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg')) 
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect() 

Wynika to z dwóch zapisów:

|  start  |   end   | avg | 
|---------------------|----------------------|-----| 
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0| 
|---------------------|----------------------|-----| 
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0| 
|---------------------|----------------------|-----| 

Funkcja okna binned dane serii czas zamiast wykonywać średnia krocząca.

Czy istnieje sposób na wyliczenie średniej kroczącej, skąd otrzymam tygodniową średnią dla każdego wiersza z przedziałem czasu kończącym się na znaczniku czasu GMT wiersza?

EDIT:

Zhang odpowiedź poniżej jest blisko tego, co chcę, ale nie dokładnie to, co chciałbym zobaczyć.

Oto lepszy przykład, aby pokazać, co usiłuję uzyskać pod adresem:

%pyspark 
from pyspark.sql import functions as F 
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"), 
         (13, "2017-03-15T12:27:18+00:00"), 
         (25, "2017-03-18T11:27:18+00:00")], 
         ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days")))) 

Wynika to w następujący dataframe:

dollars timestampGMT   rolling_average 
25  2017-03-18 11:27:18.0 25 
17  2017-03-10 15:27:18.0 15 
13  2017-03-15 12:27:18.0 15 

Chciałbym średnią się nad tydzień kontynuuje datę w kolumnie timestampGMT, co spowoduje:

dollars timestampGMT   rolling_average 
17  2017-03-10 15:27:18.0 17 
13  2017-03-15 12:27:18.0 15 
25  2017-03-18 11:27:18.0 19 

W powyższym wydaniu lts, rolling_average na 2017-03-10 to 17, ponieważ nie ma żadnych poprzednich rekordów. Wartość rolling_average dla 2017-03-15 to 15, ponieważ wynosi ona średnią 13 z 2017-03-15 i 17 z 2017-03-10, która spada z poprzedniego 7-dniowego okna. Średnia krocząca z lat 2017-03-18 wynosi 19, ponieważ średnia wynosi 25 z 2017-03-18, a 13 z 2017-03-10, która przypada w poprzednim 7-dniowym oknie, i nie obejmuje 17 z 2017 r. -03-10, ponieważ nie spada to z poprzedniego 7-dniowego okna.

Czy istnieje sposób, aby to zrobić zamiast okna binning, w którym tygodniowe okna się nie nakładają?

Odpowiedz

4

zorientowali się w prawidłowy sposób, aby obliczyć/średnia krocząca poruszający użyciu tego stackoverflow:

Spark Window Functions - rangeBetween dates

Podstawową ideą jest zamiana kolumny znacznika czasu na sekundę ds, a następnie możesz użyć funkcji rangeBetween w klasie pyspark.sql.Window, aby uwzględnić poprawne wiersze w swoim oknie.

Oto rozwiązane przykład:

%pyspark 
from pyspark.sql import functions as F 

#function to calculate number of seconds from number of days 
days = lambda i: i * 86400 

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"), 
         (13, "2017-03-15T12:27:18+00:00"), 
         (25, "2017-03-18T11:27:18+00:00")], 
         ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 

#create window by casting timestamp to long (number of seconds) 
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)) 

df = df.withColumn('rolling_average', F.avg("dollars").over(w)) 

Skutkuje dokładnym kolumnie toczenia średnich że szukałem:

dollars timestampGMT   rolling_average 
17  2017-03-10 15:27:18.0 17.0 
13  2017-03-15 12:27:18.0 15.0 
25  2017-03-18 11:27:18.0 19.0 
1

Czy oznacza to:

df = spark.createDataFrame([(17, "2017-03-11T15:27:18+00:00"), 
          (13, "2017-03-11T12:27:18+00:00"), 
          (21, "2017-03-17T11:27:18+00:00")], 
          ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 
df = df.withColumn('rolling_average', f.avg("dollars").over(Window.partitionBy(f.window("timestampGMT", "7 days")))) 

wyjściowa:

+-------+-------------------+---------------+         
|dollars|timestampGMT  |rolling_average| 
+-------+-------------------+---------------+ 
|21  |2017-03-17 19:27:18|21.0   | 
|17  |2017-03-11 23:27:18|15.0   | 
|13  |2017-03-11 20:27:18|15.0   | 
+-------+-------------------+---------------+ 
+0

Dzięki Zhang, który jest bliżej do tego, co chcę, ale nie dokładnie to, co chciałbym. Twój kod wciąż oblicza odpowiedzi za pomocą biningu daty. Chciałbym, aby każda średnia tygodniowa kończyła się z datą w rzędzie. To moja wina, że ​​nie robię świetnego przykładu. Zamierzam edytować swój wpis, podając zaktualizowany przykład pokazujący, co chcę. –