2016-02-05 13 views
5

Poniżej znajduje się przykładowy kod, który jest uruchomiony. kiedy działa to iskrzenie, łączenie Dataframe odbywa się za pomocą sortmergejoin zamiast broadcastjoin.Emisja nie ma miejsca podczas dołączania do ramek danych w programie Spark 1.6

def joinedDf (sqlContext: SQLContext, 
       txnTable: DataFrame, 
       countriesDfBroadcast: Broadcast[DataFrame]): 
       DataFrame = { 
        txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"), 
        $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner") 
       } 
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp") 

Transmisja nie następuje, nawet jeśli podam wskazówkę broadcast() w instrukcji join.

Optymalizator jest hashpartitioning theframe i powoduje skos danych.

Czy ktoś widział to zachowanie?

Używam tego w przędzy przy użyciu Spark 1.6 i HiveContext jako SQLContext. Praca iskry działa na 200 executorach. a rozmiar danych txnTable wynosi 240 GB, a datasize countryDf to 5 MB.

Odpowiedz

7

Zarówno sposób emisji DataFrame, jak i sposób jej uzyskiwania, są nieprawidłowe.

  • Standardowe transmisje nie mogą być używane do obsługi rozproszonych struktur danych. Jeśli chcesz dołączyć do przeprowadzenia transmisji na DataFrame należy użyć broadcast funkcje, które znaki podane DataFrame do nadawania:

    import org.apache.spark.sql.functions.broadcast 
    
    val countriesDf: DataFrame = ??? 
    val tmp: DataFrame = broadcast(
        countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries") 
    ) 
    
    txnTable.as("df1").join(
        broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner") 
    

    Wewnętrznie to będzie collecttmp bez konwersji z wewnętrznego i audycji później.

  • Dołączanie argumentów jest chętnie oceniane. Nawet możliwe było użycie SparkContext.broadcast, gdy struktura danych rozproszonych jest oceniana lokalnie przed wywołaniem join. To dlatego funkcja działa w ogóle, ale nie wykonuje przyłączenia transmisji.

+0

Teraz widzę BroadcastHashJoin w jednym przebiegu i SortMergeJoin w innym uruchomieniu. (ten sam kod, inny zestaw danych). –

+0

Domyślam się, że przekracza próg wielkości dla połączeń transmisji. – zero323

+0

Mam bardzo wysoką iskry.sql.autoBroadcastJoinThreshold. Około. 1 GB. A plik, który jest nadawany, ma około 5 MB. Jednak w innym przypadku powyższe zalecenie działa świetnie. –