5

Patrzę na funkcję przesuwania okna dla Spark DataFrame w Spark SQL, Scala.islowe okno funkcji sql opóźnienie

Mam ramkę danych z kolumnami Col1, Col1, Col1, data.

Col1 Col2 Col3 date  volume new_col 
         201601 100.5 
         201602 120.6 100.5 
         201603 450.2 120.6 
         201604 200.7 450.2 
         201605 121.4 200.7` 

Teraz chcę dodać nową kolumnę z nazwą (new_col) z jednym rzędem przesuwany w dół, jak pokazano powyżej.

Spróbowałem poniżej opcji, aby użyć funkcji okna.

val windSldBrdrxNrx_df = df.withColumn("Prev_brand_rx", lag("Prev_brand_rx",1)) 

Czy ktoś może mi pomóc, jak to zrobić.

+0

@Ramesh do Spark 2.0, użytkownicy musieli używać 'HiveContext' zamiast' SQLContext' do stosowania funkcji okna. 'HiveContext' jest tworzony w ten sam sposób, co' SQLContext', przekazując instancję 'SparkContext'. Jeśli dobrze pamiętam, musisz również dołączyć 'org.apache.spark: spark-hive_2.10' z odpowiednią wersją do swojej dystrybucji Spark. –

+0

@msrinivas, Dziękuję, że odpowiedź jest prawidłowa. – Ramesh

Odpowiedz

9

Robisz właściwie wszystko, czego brakowało to over(window expression) na lag

val df = sc.parallelize(Seq((201601, 100.5), 
    (201602, 120.6), 
    (201603, 450.2), 
    (201604, 200.7), 
    (201605, 121.4))).toDF("date", "volume") 

val w = org.apache.spark.sql.expressions.Window.orderBy("date") 

import org.apache.spark.sql.functions.lag 

val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w)) 

leadDf.show() 

+------+------+-------+ 
| date|volume|new_col| 
+------+------+-------+ 
|201601| 100.5| 0.0| 
|201602| 120.6| 100.5| 
|201603| 450.2| 120.6| 
|201604| 200.7| 450.2| 
|201605| 121.4| 200.7| 
+------+------+-------+ 

Ten kod został uruchomiony na Spark skorupy 2.0.2

+0

Nie mam 1.5.2 ustawiania i rozwiązywania problemów z mavenem, aby załadować 1.5.2 (słoik z uchem) w moim komputerze. – mrsrinivas

+0

Jestem w stanie stworzyć kontekst ula teraz. Ale wciąż mam ten sam błąd. – Ramesh

+0

Myślę, że od kiedy ramka danych jest tworzona za pomocą sqlcontext i nadal nie można korzystać z funkcji okna. – Ramesh

1

Można importować poniżej dwóch pakietów, które rozwiążą kwestię lag zależności.

import org.apache.spark.sql.functions.{lead, lag} 
import org.apache.spark.sql.expressions.Window