Próbuję zapisać DataFrame
do HDFS w formacie parkietu przy użyciu DataFrameWriter
, podzielona przez trzy wartości kolumn, tak:Jak podzielić i napisać DataFrame w Spark bez usuwania partycji bez nowych danych?
dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)
Jak wspomniano w this question, partitionBy
usunie pełną istniejącą hierarchię partycji na path
i zastąpił je partycjami w dataFrame
. Ponieważ nowe dane przyrostowe dla danego dnia będą się pojawiać okresowo, chcę tylko zastąpić te partycje w hierarchii, dla których dataFrame
ma dane, pozostawiając pozostałe nietknięte.
Aby to zrobić, wydaje się, że trzeba zapisać każdy indywidualnie partycję używając jego pełną ścieżkę, coś takiego:
singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")
Jednak mam problemy ze zrozumieniem, że najlepszym sposobem do organizowania danych w pojedynczej partycji DataFrame
s, dzięki czemu mogę je zapisać, korzystając z pełnej ścieżki. Jednym z pomysłów było coś takiego:
dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...
Ale foreachPartition
działa na Iterator[Row]
który nie jest idealny do zapisywania się do formatu parkietem.
Rozważałem również użycie select...distinct eventdate, hour, processtime
, aby uzyskać listę partycji, a następnie filtrowanie oryginalnej ramki danych przez każdą z tych partycji i zapisywanie wyników w pełnej partycjonowanej ścieżce. Ale odrębna kwerenda plus filtr dla każdej partycji nie wydaje się bardzo wydajna, ponieważ byłaby to duża liczba operacji filtrowania/zapisu.
Mam nadzieję, że istnieje lepszy sposób na zachowanie istniejących partycji, dla których dataFrame
nie ma danych?
Dzięki za przeczytanie.
Wersja ze Spark: 2.1
Czy sprawdzić, czy kiedy piszesz to samo dane dwa razy, że zastępuje starą partycję? Z mojego testu faktycznie tworzy on nowy plik parkietu wewnątrz katalogu partycji, powodując podwojenie danych. Jestem na Spark 2.2. – suriyanto