2016-11-22 14 views
14

Nie rozumiem, dlaczego wydaje się, że Spark używa 1 zadania dla rdd.mapPartitions podczas przekształcania wynikowego RDD w DataFrame.pyspark przy użyciu jednego zadania dla mapPartitions podczas konwersji rdd na ramkę danych

Jest to problem dla mnie, bo chciałbym przejść od:

DataFrame -> ->rdd.mapPartitionsRDD ->DataFrame

tak, że mogę czytać w danych (DataFrame) zastosuj funkcję inną niż SQL w porcjach danych (mapPartitions na RDD), a następnie przekonwertuj z powrotem do DataFrame, aby móc korzystać z procesu DataFrame.write.

Jestem w stanie przejść z DataFrame -> mapPartitions i użyć pisarza RDD, takiego jak saveAsTextFile, ale jest to mniej niż idealne, ponieważ proces DataFrame.write może np. Nadpisywać i zapisywać dane w formacie Orc. Chciałbym się więc dowiedzieć, dlaczego tak się dzieje, ale z praktycznego punktu widzenia przede wszystkim chodzi mi o to, że mogę po prostu przejść z DataFrame -> mapParitions -> do procesu DataFrame.write.

Oto powtarzalny przykład. Poniższe działa zgodnie z oczekiwaniami, z 100 zadań dla pracy mapPartitions:

from pyspark.sql import SparkSession 
import pandas as pd 

spark = SparkSession \ 
    .builder \ 
    .master("yarn-client") \ 
    .enableHiveSupport() \ 
    .getOrCreate() 

sc = spark.sparkContext 

df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]}) 
spark_df = spark.createDataFrame(df).repartition(100) 

def f(part): 
    return [(1,2)] 

spark_df.rdd.mapPartitions(f).collect() 

Jednak jeżeli ostatnia linia jest zmiana na coś spark_df.rdd.mapPartitions(f).toDF().show() to nie będzie tylko jedno zadanie do pracy mapPartitions.

Niektóre zrzuty ekranu ilustrujący ten poniżej: enter image description here enter image description here

Odpowiedz

5

DataFrame.show() pokazuje tylko pierwszą liczbę wierszy swojego dataframe domyślnie tylko pierwsze 20. Jeśli liczba ta jest mniejsza niż liczba wierszy na partycji Spark jest leniwy i ocenia tylko jedną partycję, która jest równoważna pojedynczemu zadaniu.

Można również wykonać collect na ramce danych, aby obliczyć i zebrać wszystkie partycje i ponownie wyświetlić 100 zadań.

Przedtem nadal będzie widoczne zadanie runJob, które jest spowodowane wywołaniem toDF, aby móc określić wynikowy schemat danej ramki danych: musi przetworzyć pojedynczą partycję, aby móc określić typy wyników mapowania funkcjonować. Po tym początkowym etapie rzeczywiste działanie, takie jak collect, będzie miało miejsce na wszystkich partycjach. Na przykład, dla mnie prowadzenie fragment z ostatniej linii zastąpiono spark_df.rdd.mapPartitions(f).toDF().collect() wyników w tych etapach:

enter image description here

+0

To samo dzieje się, gdy wywołanie 'DataFrame.write' na wynik, jak również. – David

+0

Czy czekasz na zakończenie swoich zadań? Kiedy robię 'toDF(). Collect()', widzę także etap 'runJob' z jednym zadaniem, zainicjowanym przez' toDF' w celu sprawdzenia schematu wynikowej ramki danych, po której następuje etap 'collect' z oczekiwanym 100 zadań. – sgvd

+1

'collect()' nie jest wykonalne dla mnie w rzeczywistości, biorąc pod uwagę, że końcowy wynik to kilkaset GB danych. Zadanie kończy się niepowodzeniem podczas uruchamiania 'DataFrame.write' z tylko jednym zadaniem, ale powodzenie przy uruchomieniu' saveAsText'. Będę edytować przykłady z kolekcji i pokaż do zapisywania danych, ponieważ może być różnica między nimi. – David