2017-03-22 62 views
10

chcę przekonwertować RDD do DataFrame i chcą buforować wyniki RDD:buforowanie nakazał Spark DataFrame tworzy niechciane zadanie

from pyspark.sql import * 
from pyspark.sql.types import * 
import pyspark.sql.functions as fn 

schema = StructType([StructField('t', DoubleType()), StructField('value', DoubleType())]) 

df = spark.createDataFrame(
    sc.parallelize([Row(t=float(i/10), value=float(i*i)) for i in range(1000)], 4), #.cache(), 
    schema=schema, 
    verifySchema=False 
).orderBy("t") #.cache() 
  • Jeśli nie używać cache funkcji nie jest praca wygenerowany.
  • Jeśli używasz cache tylko po orderBy 1 pracy jest generowany dla cache: enter image description here
  • Jeśli używasz cache dopiero po parallelize praca nie jest generowany.

Dlaczego cache generuje pracę w tym jednym przypadku? Jak mogę uniknąć generowania zleceń z cache (buforowanie DataFrame i brak RDD)?

Edit: badałem bardziej na problem i okazało się, że bez orderBy("t") nie praca jest generowany. Czemu?

+0

Wraz z aktualizacjami wyjaśniającymi pytanie, usunąłem oryginalną odpowiedź. To interesujące pytanie, ponieważ orderBy ("t") jest leniwie oceniany tak, jak powinien, a cache() bez orderBy jest również leniwie oceniany, ale razem też nie jestem pewien, dlaczego coś jest wykonywane dla operacji czysto transformatora. – Garren

Odpowiedz

1

I złożyła bug ticket i było zamknięte z następującego powodu:

buforowanie wymaga RDD podkładową. Wymaga to również znajomości partycji podkładowych , co jest nieco szczególne w przypadku zamówienia globalnego: uruchamia zadanie (skanowanie), ponieważ musimy określić granice partycji .