2015-02-09 13 views
5

Mamy magazyn ula i chcieliśmy używać iskry do różnych zadań (głównie klasyfikacja). Czasami zapisuj wyniki jako tabelę ula. Na przykład, napisaliśmy następującą funkcję Pythona, aby znaleźć całkowitą sumę original_table kolumna dwa, pogrupowane według oryginału kolumna pierwsza. Ta funkcja działa, ale obawiamy się, że jest ona nieefektywna, zwłaszcza jeśli chodzi o konwersję map na pary klucz-wartość i wersje słownika. Kombinator funkcji, mergeValue, mergeCombiner są zdefiniowane gdzie indziej, ale działają dobrze.czytanie i pisanie z tablic ula z iskrą po agregacji

from pyspark import HiveContext 

rdd = HiveContext(sc).sql('from original_table select *') 

#convert to key-value pairs 
key_value_rdd = rdd.map(lambda x: (x[0], int(x[1]))) 

#create rdd where rows are (key, (sum, count) 
combined = key_value_rdd.combineByKey(combiner, mergeValue, mergeCombiner) 

# creates rdd with dictionary values in order to create schemardd 
dict_rdd = combined.map(lambda x: {'k1': x[0], 'v1': x[1][0], 'v2': x[1][1]}) 

# infer the schema 
schema_rdd = HiveContext(sc).inferSchema(dict_rdd) 

# save 
schema_rdd.saveAsTable('new_table_name') 

Czy istnieją bardziej skuteczne sposoby robienia tego samego?

+1

nie wiesz, dlaczego musisz dokonać konwersji na program rdd, ale jeśli nalegasz, możesz po prostu zrobić 'key_value_rdd.reduceByKey (lambda x, y: sum (x, y))' zamiast 'combineByKey'. – mtoto

Odpowiedz

0

... może to nie było możliwe, gdy pytanie zostało napisane, ale czy to nie ma sensu teraz (post 1.3), aby użyć wywołania createDataFrame()?

Po uzyskaniu pierwszego RDD wygląda na to, że można wykonać wywołanie, a następnie uruchomić proste polecenie SQL w stosunku do struktury, aby wykonać całą pracę w jednym przebiegu. (Sumowanie i grupowanie) Dodatkowo, struktura DataFrame może wywnioskować schemat bezpośrednio po utworzeniu, jeśli czytam dokument API poprawnie.

(http://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext)

0

Błąd ten można rozwiązać poprzez ustawienie hive.exec.scratchdir do folderu, w którym użytkownik ma dostęp

+1

To powinien być komentarz, myślę. – ketan

+0

o jakim błędzie mówisz? – mtoto

0

Jaka wersja iskry używasz?

Ta odpowiedź jest oparta na 1.6 & przy użyciu ramek danych.

val sc = new SparkContext(conf) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

import sqlContext.implicits._ 
val client = Seq((1, "A", 10), (2, "A", 5), (3, "B", 56)).toDF("ID", "Categ", "Amnt") 

    import org.apache.spark.sql.functions._ 
    client.groupBy("Categ").agg(sum("Amnt").as("Sum"), count("ID").as("count")).show() 


+-----+---+-----+ 
|Categ|Sum|count| 
+-----+---+-----+ 
| A| 15| 2| 
| B| 56| 1| 
+-----+---+-----+ 

Mam nadzieję, że to pomoże!