2017-02-18 34 views
12

Próbuję zapisać DataFrame do HDFS w formacie parkietu przy użyciu DataFrameWriter, podzielona przez trzy wartości kolumn, tak:Jak podzielić i napisać DataFrame w Spark bez usuwania partycji bez nowych danych?

dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path) 

Jak wspomniano w this question, partitionBy usunie pełną istniejącą hierarchię partycji na path i zastąpił je partycjami w dataFrame. Ponieważ nowe dane przyrostowe dla danego dnia będą się pojawiać okresowo, chcę tylko zastąpić te partycje w hierarchii, dla których dataFrame ma dane, pozostawiając pozostałe nietknięte.

Aby to zrobić, wydaje się, że trzeba zapisać każdy indywidualnie partycję używając jego pełną ścieżkę, coś takiego:

singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890") 

Jednak mam problemy ze zrozumieniem, że najlepszym sposobem do organizowania danych w pojedynczej partycji DataFrame s, dzięki czemu mogę je zapisać, korzystając z pełnej ścieżki. Jednym z pomysłów było coś takiego:

dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ... 

Ale foreachPartition działa na Iterator[Row] który nie jest idealny do zapisywania się do formatu parkietem.

Rozważałem również użycie select...distinct eventdate, hour, processtime, aby uzyskać listę partycji, a następnie filtrowanie oryginalnej ramki danych przez każdą z tych partycji i zapisywanie wyników w pełnej partycjonowanej ścieżce. Ale odrębna kwerenda plus filtr dla każdej partycji nie wydaje się bardzo wydajna, ponieważ byłaby to duża liczba operacji filtrowania/zapisu.

Mam nadzieję, że istnieje lepszy sposób na zachowanie istniejących partycji, dla których dataFrame nie ma danych?

Dzięki za przeczytanie.

Wersja ze Spark: 2.1

Odpowiedz

0

Możesz wypróbować tryb jako dodatek.

dataFrame.write.format("parquet") 
.mode("append") 
.partitionBy("year","month") 
.option("path",s"$path/table_name") 
.saveAsTable(s"stg_table_name") 
1

Opcja trybu Append ma haczyk!

df.write.partitionBy("y","m","d") 
.mode(SaveMode.Append) 
.parquet("/data/hive/warehouse/mydbname.db/" + tableName) 

Przetestowałem i zobaczyłem, że zachowa to istniejące pliki partycji. Jednak problem ten jest następujący: Jeśli uruchomisz ten sam kod dwa razy (z tymi samymi danymi), utworzy on nowe pliki parkietu zamiast zastępowania istniejących dla tych samych danych (Spark 1.6). Tak więc, zamiast używać Append, nadal możemy rozwiązać ten problem z Overwrite. Zamiast nadpisywać na poziomie tabeli, powinniśmy nadpisać na poziomie partycji.

df.write.mode(SaveMode.Overwrite) 
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day) 

Patrz na poniższy link, aby uzyskać więcej informacji:

Overwrite specific partitions in spark dataframe write method

(zaktualizowałem moją odpowiedź po komentarzu suriyanto za Thnx.).

+0

Czy sprawdzić, czy kiedy piszesz to samo dane dwa razy, że zastępuje starą partycję? Z mojego testu faktycznie tworzy on nowy plik parkietu wewnątrz katalogu partycji, powodując podwojenie danych. Jestem na Spark 2.2. – suriyanto