2017-01-25 44 views
7

staram się ponownego dzielenia DataFrame według columnm The DataFrame ma N (powiedzmy N=3) różne wartości w partycji kolumnie x, np:opuszczając puste przegródki DataFrame Apache Spark

val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data 

Co ja Chcąc to osiągnąć, należy ponownie rozdzielić myDF przez x bez tworzenia pustych partycji. Czy jest lepszy sposób niż to zrobić?

val numParts = myDF.select($"x").distinct().count.toInt 
myDF.repartition(numParts,$"x") 

(Gdybym nie określają numParts w repartiton, większość moich partycji są puste (jak repartition tworzy 200 partycje) ...)

+1

Zgodnie z http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options, 200 partycji zostanie utworzonych z powodu wartości domyślnej dla opcji konfiguracyjnej 'iskr .sql.shuffle.partitions' – AKSW

+1

Odpowiedź można znaleźć http://stackoverflow.com/questions/41854818/spark-dataframe-repartition-number-of-partition-not-przedstawiony?noredirect=1#comment70893687_41854818 – FaigB

Odpowiedz

2

myślałem roztworu z iteracji po df partycja i pobieranie rekordu w nim znajdują się, aby znaleźć niepuste partycje.

val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart") 

df.foreachPartition(partition => 
    if (partition.length > 0) nonEmptyPart.add(1)) 

Jak mamy niepuste partycje (nonEmptyPart), możemy oczyścić pustych partycji za pomocą coalesce() (check coalesce() vs reparation()).

val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type 

To może lub nie może być najlepszy, ale to rozwiązanie uniknie tasowanie jak nie używamy reparation()


przykład zająć komentarz

val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x") 
val nonEmptyPart = sc.longAccumulator("nonEmptyPart") 

df1.foreachPartition(partition => 
    if (partition.length > 0) nonEmptyPart.add(1)) 

val finalDf = df1.coalesce(nonEmptyPart.value.toInt) 

println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}") 
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}") 
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}") 

Output

nonEmptyPart => 3 
df.rdd.partitions.length => 200 
finalDf.rdd.partitions.length => 3 
+0

'val df = sc.parallelize (Seq (1,1,2,2,3,3)) toDF ("x") .rozdzielenie (10, $ "x") .coalesce (3) '. Teraz zawęża liczbę partycji z 10 do 3. – mrsrinivas

+0

, a teraz robi 'finalDf.foreachPartition (p => println (p.size))'. Otrzymuję '0 0 6', tzn. 2 partycje są puste, 1 zawiera wszystkie wiersze. To nie jest to czego chciałem (jestem Spisk 1.6.3) –

+0

Może to być spowodowane wyłączeniem losowym z 'coalesce'. Spróbuj użyć 'repartition', przetasuje wszystkie dane zgodnie z' HashPartitioner'. więc będzie szansa, że ​​każda partycja zostanie wypełniona pewnymi danymi. Jeśli naprawdę chcesz usunąć puste partycje, możesz go uruchomić (** znalezienie niepustych partycji i zastosowanie koalescji/repartycji **) iteracyjnie. – mrsrinivas