Mamy magazyn ula i chcieliśmy używać iskry do różnych zadań (głównie klasyfikacja). Czasami zapisuj wyniki jako tabelę ula. Na przykład, napisaliśmy następującą funkcję Pythona, aby znaleźć całkowitą sumę original_table kolumna dwa, pogrupowane według oryginału kolumna pierwsza. Ta funkcja działa, ale obawiamy się, że jest ona nieefektywna, zwłaszcza jeśli chodzi o konwersję map na pary klucz-wartość i wersje słownika. Kombinator funkcji, mergeValue, mergeCombiner są zdefiniowane gdzie indziej, ale działają dobrze.czytanie i pisanie z tablic ula z iskrą po agregacji
from pyspark import HiveContext
rdd = HiveContext(sc).sql('from original_table select *')
#convert to key-value pairs
key_value_rdd = rdd.map(lambda x: (x[0], int(x[1])))
#create rdd where rows are (key, (sum, count)
combined = key_value_rdd.combineByKey(combiner, mergeValue, mergeCombiner)
# creates rdd with dictionary values in order to create schemardd
dict_rdd = combined.map(lambda x: {'k1': x[0], 'v1': x[1][0], 'v2': x[1][1]})
# infer the schema
schema_rdd = HiveContext(sc).inferSchema(dict_rdd)
# save
schema_rdd.saveAsTable('new_table_name')
Czy istnieją bardziej skuteczne sposoby robienia tego samego?
nie wiesz, dlaczego musisz dokonać konwersji na program rdd, ale jeśli nalegasz, możesz po prostu zrobić 'key_value_rdd.reduceByKey (lambda x, y: sum (x, y))' zamiast 'combineByKey'. – mtoto