2013-02-19 23 views
11

Mam sytuację podobną do tej opisanej here, z tym że zamiast łączyć zadania z wieloma argumentami, chcę łańcuchować zadania, które zwracają słownik z wieloma wpisami.Selerowy łańcuch zadań i uzyskiwanie dostępu ** kwargs

To jest - bardzo luźno i abstrakcyjnie --- co usiłuję zrobić:

tasks.py

@task() 
def task1(item1=None, item2=None): 
    item3 = #do some stuff with item1 and item2 to yield item3 
    return_object = dict(item1=item1, item2=item2, item3=item3) 
    return return_object 

def task2(item1=None, item2=None, item3=None): 
    item4 = #do something with item1, item2, item3 to yield item4 
    return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4) 
    return return_object 

Praca ipython, jestem w stanie wywołać Zadania1 indywidualnie i asynchronicznie bez problemów.

mogę również zadzwonić task2 indywidualnie z wyniku zwróconego przez Zadania1 jako double-gwiazdkowego argumentu:

>>res1 = task1.s(item1=something, item2=something_else).apply_async() 
>>res1.status 
'SUCCESS' 
>>res2 = task2.s(**res1.result).apply_async() 
>>res2.status 
'SUCCESS 

Jednak to, co ostatecznie chcemy osiągnąć jest taki sam efekt końcowy, jak wyżej, ale za pomocą łańcucha, i tutaj, nie mogę dowiedzieć się, jak mają task2 nie tworzony z (pozycyjnych) argumentów zwróconych przez Zadania1, ale z task1.result jako ** kwargs:

chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async() #THIS DOESN'T WORK! 

podejrzewam, że mogę wrócić i przepisać moje zadania tak, aby t hej zwracają argumenty pozycyjne zamiast słownika, a to może wyjaśnić, ale wydaje mi się, że powinien istnieć jakiś sposób dostępu do obiektu zwracanego task1 w zadaniu 2 z równoważną funkcjonalnością ** podwójnej gwiazdy. Podejrzewam również, że brakuje mi czegoś dość oczywistego na temat implementacji podzadania w Celery lub * args vs. ** kwargs.

Mam nadzieję, że to ma sens. I z góry dziękuję za wszelkie wskazówki.

Odpowiedz

1

chain oraz inne podstawowe elementy canvas należą do rodziny funkcjonalnych narzędzi użytkowych , takich jak map i reduce.

E.g. gdzie map(target, items) wywołuje target(item) dla każdej pozycji na liście, Python ma rzadko używaną wersję mapy o nazwie itertools.starmap, , która zamiast tego wywołuje target(*item).

Chociaż można dodać starchain, a nawet kwstarchain do przybornika, te będą bardzo specjalistyczne i prawdopodobnie nie będą używane tak często.

ciekawe Python dokonał te zbędne z listy i generatora wyrażeń tak, że mapa jest zastąpiony [target(item) for item in item] i Starmap z [target(*item) for item in item].

Zamiast więc implementować kilka alternatyw dla każdego prymitywu, uważam, że powinniśmy skupić się na znalezieniu bardziej elastycznego sposobu na poparcie tego, np. jak posiadanie wyrażeń generatorowych generowanych przez selera (jeśli to możliwe, a jeśli nie coś równie potężnego)

+0

zrozumiałe. Dzięki. W końcu rozwiązałem to przez nieznaczną zmianę danych wejściowych/zwrotów w moim zadaniu. T2 szuka teraz pojedynczego obiektu dyktowanego jako wejścia, a następnie pobiera oczekiwane pary k/wartości z dyktafonu, aby wykonać zadanie. –

+0

@BenjaminWhite wciąż nie mogę tego zdobyć. możesz mi powiedzieć, jak to zrobiłeś – ashim888

1

Ponieważ nie jest to wbudowane w seler, napisałem funkcję dekoratora do czegoś podobnego siebie.

# Use this wrapper with functions in chains that return a tuple. The 
# next function in the chain will get called with that the contents of 
# tuple as (first) positional args, rather than just as just the first 
# arg. Note that both the sending and receiving function must have 
# this wrapper, which goes between the @task decorator and the 
# function definition. This wrapper should not otherwise interfere 
# when these conditions are not met. 

class UnwrapMe(object): 
    def __init__(self, contents): 
     self.contents = contents 

    def __call__(self): 
     return self.contents 

def wrap_for_chain(f): 
    """ Too much deep magic. """ 
    @functools.wraps(f) 
    def _wrapper(*args, **kwargs): 
     if type(args[0]) == UnwrapMe: 
      args = list(args[0]()) + list(args[1:]) 
     result = f(*args, **kwargs) 

     if type(result) == tuple and current_task.request.callbacks: 
      return UnwrapMe(result) 
     else: 
      return result 
    return _wrapper 

Kopalnia jak rozpakowuje koncepcji starchain, ale można łatwo modyfikować go zamiast rozpakować kwargs.

5

To jest moje zdanie na ten problem, przy użyciu klasy abstrakcyjnej zadanie:

from __future__ import absolute_import 
from celery import Task 
from myapp.tasks.celery import app 


class ChainedTask(Task): 
    abstract = True  

    def __call__(self, *args, **kwargs): 
     if len(args) == 1 and isinstance(args[0], dict): 
      kwargs.update(args[0]) 
      args =() 
     return super(ChainedTask, self).__call__(*args, **kwargs) 

@app.task(base=ChainedTask) 
def task1(x, y): 
    return {'x': x * 2, 'y': y * 2, 'z': x * y}  


@app.task(base=ChainedTask) 
def task2(x, y, z): 
    return {'x': x * 3, 'y': y * 3, 'z': z * 2} 

Teraz można zdefiniować i wykonać swój łańcuch jako takie:

from celery import chain 

pipe = chain(task1.s(x=1, y=2) | task2.s()) 
pipe.apply_async()