2016-02-16 9 views
8

Używam PySpark do klasycznego zadania ETL (ładuję zbiór danych, przetwarzam go, zapisuję) i chcę zapisać moją ramkę danych jako pliki/katalog partycjonowany przez "wirtualną" kolumnę ; co mam na myśli przez "wirtualny" jest to, że mam kolumnę Znacznik czasu, który jest ciągiem zawierającym datę zakodowaną w ISO 8601, i chciałbym podzielić go na rok/miesiąc/dzień; ale w rzeczywistości nie mam kolumny Data, Miesiąc lub Dzień w DataFrame; Mam ten znacznik czasu, z którego mogę wyprowadzić te kolumny, ale nie chcę, aby moje pozycje wynikowe miały jedną z tych kolumn serializowaną.Spark: zapisz DataFrame podzieloną na partycje przez "wirtualną" kolumnę

Struktura plików wynikające z zapisaniem DataFrame na dysku powinna wyglądać następująco:

/ 
    year=2016/ 
     month=01/ 
      day=01/ 
       part-****.gz 

Czy istnieje sposób, aby zrobić to, co chcę z Spark/Pyspark?

Odpowiedz

15

Kolumny używane do partycjonowania nie są zawarte w samych serializowanych danych. Na przykład, jeśli tworzysz DataFrame takiego:

df = sc.parallelize([ 
    (1, "foo", 2.0, "2016-02-16"), 
    (2, "bar", 3.0, "2016-02-16") 
]).toDF(["id", "x", "y", "date"]) 

i zapisać go w następujący sposób:

import tempfile 
from pyspark.sql.functions import col, dayofmonth, month, year 
outdir = tempfile.mktemp() 

dt = col("date").cast("date") 
fname = [(year, "year"), (month, "month"), (dayofmonth, "day")] 
exprs = [col("*")] + [f(dt).alias(name) for f, name in fname] 

(df 
    .select(*exprs) 
    .write 
    .partitionBy(*(name for _, name in fname)) 
    .format("json") 
    .save(outdir)) 

poszczególne pliki nie będą zawierać kolumny podziału: dane

import os 

(sqlContext.read 
    .json(os.path.join(outdir, "year=2016/month=2/day=16/")) 
    .printSchema()) 

## root 
## |-- date: string (nullable = true) 
## |-- id: long (nullable = true) 
## |-- x: string (nullable = true) 
## |-- y: double (nullable = true) 

partycjonowanie są przechowywane tylko w strukturze katalogów i nie jest duplikowany w plikach seryjnych. Zostanie on dołączony tylko wtedy, gdy przeczytane pełne lub częściowe drzewo katalogów:

sqlContext.read.json(outdir).printSchema() 

## root 
## |-- date: string (nullable = true) 
## |-- id: long (nullable = true) 
## |-- x: string (nullable = true) 
## |-- y: double (nullable = true) 
## |-- year: integer (nullable = true) 
## |-- month: integer (nullable = true) 
## |-- day: integer (nullable = true) 

sqlContext.read.json(os.path.join(outdir, "year=2016/month=2/")).printSchema() 

## root 
## |-- date: string (nullable = true) 
## |-- id: long (nullable = true) 
## |-- x: string (nullable = true) 
## |-- y: double (nullable = true) 
## |-- day: integer (nullable = true) 
+0

Jestem nowy dla Pythona. Czy istnieje sposób, aby to zrobić bez posiadania roku =, miesiąca = i dnia = w ścieżce? Rozumiem większość z tego – deanw