9

Próbuję zrozumieć fizyczne plany na iskrze, ale nie rozumiem niektórych części, ponieważ wyglądają one inaczej niż tradycyjne rdbms. Na przykład w poniższym planie jest plan dotyczący zapytania nad tabelą ula. Zapytanie to:Zrozumienie fizycznego planu iskrzenia

select 
     l_returnflag, 
     l_linestatus, 
     sum(l_quantity) as sum_qty, 
     sum(l_extendedprice) as sum_base_price, 
     sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, 
     sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, 
     avg(l_quantity) as avg_qty, 
     avg(l_extendedprice) as avg_price, 
     avg(l_discount) as avg_disc, 
     count(*) as count_order 
    from 
     lineitem 
    where 
     l_shipdate <= '1998-09-16' 
    group by 
     l_returnflag, 
     l_linestatus 
    order by 
     l_returnflag, 
     l_linestatus; 


== Physical Plan == 
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0 
+- ConvertToUnsafe 
    +- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None 
     +- ConvertToSafe 
     +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L]) 
      +- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None 
       +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L]) 
        +- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35] 
        +- Filter (l_shipdate#37 <= 1998-09-16) 
         +- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None 

Na co mi zrozumienie w planie jest:

  1. Pierwszy rozpoczyna się skanowanie tabeli Hive

  2. Następnie filtrować za pomocą których stan

  3. Następnie projekt, aby uzyskać kolumny, które chcemy

  4. Następnie TungstenAggregate?

  5. Następnie TungstenExchange?

  6. Następnie ponownie TungstenAggregate?

  7. Następnie ConvertToSafe?

  8. Następnie sortuje wynik końcowy

Ale ja nie rozumiejąc kroki 4, 5, 6 i 7. Wiesz czym one są? Szukam informacji na ten temat, aby zrozumieć plan, ale nie znajduję nic konkretnego.

Odpowiedz

14

Spójrzmy na strukturę zapytania SQL użyć:

SELECT 
    ... -- not aggregated columns #1 
    ... -- aggregated columns  #2 
FROM 
    ...       -- #3 
WHERE 
    ...       -- #4 
GROUP BY 
    ...       -- #5 
ORDER BY 
    ...       -- #6 

Jak już podejrzany:

  • Filter (...) odpowiada orzeczników w WHERE klauzuli numer (#4)
  • Project ... limitów kolumn do wymaganych przez unię (#1 i #2 i #4/#6 jeśli nie występuje w SELECT)
  • HiveTableScan odpowiada FROM punkt (#3)

Pozostałe części można przypisać w następujący sposób:

  • #2 z SELECT punkt - functions pola w TungstenAggregates
  • GROUP BY (#4):

    • TungstenExchange/hash partycjonowania
    • key pola w TungstenAggregates
  • #6 - ORDER BY klauzuli.

Projekt Wolfram ogólnie opisuje szereg optymalizacji wykorzystywanych przez Spark DataFrames (- sets), w tym:

  • wyraźnej zarządzania pamięcią z sun.misc.Unsafe. Oznacza to "natywne" wykorzystanie pamięci i jawne przydzielanie/zwalnianie pamięci poza zarządzaniem GC. Konwersje te odpowiadają ConvertToUnsafe/ConvertToSafe krokom w planie wykonania. Możesz dowiedzieć się kilku ciekawych szczegółów na temat niebezpieczeństwa z Understanding sun.misc.Unsafe
  • generowania kodu - różnych sztuczek meta-programowania zaprojektowanych do generowania kodu, który jest lepiej zoptymalizowany podczas kompilacji. Możesz myśleć o tym jak o wewnętrznym kompilatorze Spark, który robi takie rzeczy jak przepisywanie ładnego kodu funkcjonalnego na brzydki dla pętli.

Możesz dowiedzieć się więcej o wolframu w ogólności od Project Tungsten: Bringing Apache Spark Closer to Bare Metal. Apache Spark 2.0: Faster, Easier, and Smarter podaje kilka przykładów generowania kodu.

Występuje dwa razy, ponieważ dane są najpierw agregowane lokalnie na każdej partycji, następnie są tasowane i ostatecznie scalane. Jeśli znasz interfejs API RDD, ten proces jest mniej więcej równoważny z reduceByKey.

Jeśli plan wykonania nie jest jasny, możesz również spróbować przekonwertować wynikową wersję DataFrame na RDD i przeanalizować wyjście z toDebugString.

+0

Dzięki za odpowiedź.Po prostu nie zrozumiałem wyraźnie tej części "# 2 z klauzuli SELECT - pole funkcji w TungstenAggregates". Jeśli potrafisz lepiej wyjaśnić, to bądź miły! – codin

+0

Pole 'Funkcje' wyświetla wszystkie agregacje, które są wykonywane na danym etapie, podczas gdy pole' Klucz' opisuje grupowanie. jest to 'df.groupBy (* key) .agg (* funkcje)'. – zero323

1

Tungsten jest nowym silnikiem pamięci w Spark od wersji 1.4, który zarządza danymi poza JVM, aby zaoszczędzić trochę narzutu GC. Możesz sobie wyobrazić, że zrobienie tego wymaga kopiowania danych zi do JVM. to jest to! W Sparku 1.5 możesz wyłączyć Tungsten poprzez spark.sql.tungsten.enabled, wtedy zobaczysz "stary" plan, w Sparku 1.6 Myślę, że nie możesz go już wyłączyć.