Poniżej znajduje się przykładowy kod, który jest uruchomiony. kiedy działa to iskrzenie, łączenie Dataframe odbywa się za pomocą sortmergejoin zamiast broadcastjoin.Emisja nie ma miejsca podczas dołączania do ramek danych w programie Spark 1.6
def joinedDf (sqlContext: SQLContext,
txnTable: DataFrame,
countriesDfBroadcast: Broadcast[DataFrame]):
DataFrame = {
txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
$"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
}
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")
Transmisja nie następuje, nawet jeśli podam wskazówkę broadcast() w instrukcji join.
Optymalizator jest hashpartitioning theframe i powoduje skos danych.
Czy ktoś widział to zachowanie?
Używam tego w przędzy przy użyciu Spark 1.6 i HiveContext jako SQLContext. Praca iskry działa na 200 executorach. a rozmiar danych txnTable wynosi 240 GB, a datasize countryDf to 5 MB.
Teraz widzę BroadcastHashJoin w jednym przebiegu i SortMergeJoin w innym uruchomieniu. (ten sam kod, inny zestaw danych). –
Domyślam się, że przekracza próg wielkości dla połączeń transmisji. – zero323
Mam bardzo wysoką iskry.sql.autoBroadcastJoinThreshold. Około. 1 GB. A plik, który jest nadawany, ma około 5 MB. Jednak w innym przypadku powyższe zalecenie działa świetnie. –