2016-05-24 28 views
8

Mam iskrową ramkę danych o następującej strukturze. Ciało_tekst_token ma tokeny (przetworzone/zestaw słów). I mam zagnieżdżonych listę zdefiniowanych słów kluczowychPrzekazywanie kolumny ramki danych i zewnętrznej listy do udf w ramach z Kolumną

root 
|-- id: string (nullable = true) 
|-- body: string (nullable = true) 
|-- bodyText_token: array (nullable = true) 

keyword_list=['union','workers','strike','pay','rally','free','immigration',], 
['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']] 

I potrzebnych, by sprawdzić, ile żetonów wchodzą w każdej listy słów kluczowych i dodać wynik jako nowej kolumny istniejącego dataframe. Np .: jeśli tokens =["become", "farmer","rally","workers","student"] wynikiem będzie -> [1,2,0]

Następująca funkcja działała zgodnie z oczekiwaniami.

def label_maker_topic(tokens,topic_words): 
    twt_list = [] 
    for i in range(0, len(topic_words)): 
     count = 0 
     #print(topic_words[i]) 
     for tkn in tokens: 
      if tkn in topic_words[i]: 
       count += 1 
     twt_list.append(count) 

    return twt_list 

Użyłem modułu udf w kolumnie z dostępnymi funkcjami i otrzymałem komunikat o błędzie. Myślę, że chodzi o przekazanie zewnętrznej listy do udf. Czy jest sposób, w jaki mogę przekazać zewnętrzną listę i kolumnę datafram do udf i dodać nową kolumnę do mojej ramki danych?

topicWord = udf(label_maker_topic,StringType()) 
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list)) 

Odpowiedz

20

Najczystsze rozwiązaniem jest przekazanie dodatkowych argumentów za pomocą zamknięcia:

def make_topic_word(topic_words): 
    return udf(lambda c: label_maker_topic(c, topic_words)) 

df = sc.parallelize([(["union"],)]).toDF(["tokens"]) 

(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens"))) 
    .show()) 

ta nie wymaga żadnych zmian w keyword_list lub funkcji owinąć z UDF. Możesz również użyć tej metody, aby przekazać dowolny obiekt. Może to służyć do przekazania na przykład listy sets dla wydajnych wyszukiwań.

Jeśli chcesz korzystać z bieżącej UDF i przekazać topic_words bezpośrednio musisz przekonwertować go do kolumny dosłownym pierwszy:

from pyspark.sql.functions import array, lit 

ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list]) 
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show() 

zależności od danych i wymagań nie może alternatywne, bardziej efektywnych rozwiązań, które nie wymagają UDFs (explode + aggregate + collapse) lub lookups (operacje mieszania + wektorowe).

7

Poniższe działa dobrze, gdzie każdy parametr zewnętrzny może być przekazane do UDF (kod manipulowane, aby pomóc każdemu)

topicWord=udf(lambda tkn: label_maker_topic(tkn,topic_words),StringType()) 
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token)) 
+0

To działa, ale byłbym ostrożny z tym, ponieważ UDF będzie mieć 'topic_words 'wartość w momencie zdefiniowania pakietu udf. Tak więc zmiana 'topic_words' i ponowne użycie udf później nie będzie działać - nadal będzie używać wartości' topic_words' w momencie zdefiniowania pakietu udf. – CHP