Nie rozumiem, dlaczego wydaje się, że Spark używa 1 zadania dla rdd.mapPartitions
podczas przekształcania wynikowego RDD w DataFrame.pyspark przy użyciu jednego zadania dla mapPartitions podczas konwersji rdd na ramkę danych
Jest to problem dla mnie, bo chciałbym przejść od:
DataFrame
-> ->rdd.mapPartitions
RDD
->DataFrame
tak, że mogę czytać w danych (DataFrame) zastosuj funkcję inną niż SQL w porcjach danych (mapPartitions na RDD), a następnie przekonwertuj z powrotem do DataFrame, aby móc korzystać z procesu DataFrame.write
.
Jestem w stanie przejść z DataFrame -> mapPartitions i użyć pisarza RDD, takiego jak saveAsTextFile, ale jest to mniej niż idealne, ponieważ proces DataFrame.write
może np. Nadpisywać i zapisywać dane w formacie Orc. Chciałbym się więc dowiedzieć, dlaczego tak się dzieje, ale z praktycznego punktu widzenia przede wszystkim chodzi mi o to, że mogę po prostu przejść z DataFrame -> mapParitions -> do procesu DataFrame.write.
Oto powtarzalny przykład. Poniższe działa zgodnie z oczekiwaniami, z 100 zadań dla pracy mapPartitions
:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession \
.builder \
.master("yarn-client") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]})
spark_df = spark.createDataFrame(df).repartition(100)
def f(part):
return [(1,2)]
spark_df.rdd.mapPartitions(f).collect()
Jednak jeżeli ostatnia linia jest zmiana na coś spark_df.rdd.mapPartitions(f).toDF().show()
to nie będzie tylko jedno zadanie do pracy mapPartitions
.
Niektóre zrzuty ekranu ilustrujący ten poniżej:
To samo dzieje się, gdy wywołanie 'DataFrame.write' na wynik, jak również. – David
Czy czekasz na zakończenie swoich zadań? Kiedy robię 'toDF(). Collect()', widzę także etap 'runJob' z jednym zadaniem, zainicjowanym przez' toDF' w celu sprawdzenia schematu wynikowej ramki danych, po której następuje etap 'collect' z oczekiwanym 100 zadań. – sgvd
'collect()' nie jest wykonalne dla mnie w rzeczywistości, biorąc pod uwagę, że końcowy wynik to kilkaset GB danych. Zadanie kończy się niepowodzeniem podczas uruchamiania 'DataFrame.write' z tylko jednym zadaniem, ale powodzenie przy uruchomieniu' saveAsText'. Będę edytować przykłady z kolekcji i pokaż do zapisywania danych, ponieważ może być różnica między nimi. – David