2014-06-24 28 views
5

Jak mogę pobrać wynik zadania, jeśli nie wiem wcześniej, które zadanie zostało wykonane? Oto konfiguracja: Biorąc pod uwagę następujące źródła ('tasks.py'):Odzyskaj wynik z 'task_id' w Celery z nieznanego zadania

from celery import Celery 

app = Celery('tasks', backend="db+mysql://u:[email protected]/db", broker = 'amqp://guest:[email protected]:5672//') 

@app.task 
def add(x,y): 
    return x + y 


@app.task 
def mul(x,y): 
    return x * y 

z RabbitMQ 3.3.2 działa lokalnie:

marcs-mbp:sbin marcstreeter$ ./rabbitmq-server 

       RabbitMQ 3.3.2. Copyright (C) 2007-2014 GoPivotal, Inc. 
    ## ##  Licensed under the MPL. See http://www.rabbitmq.com/ 
    ## ## 
    ########## Logs: /usr/local/var/log/rabbitmq/[email protected] 
    ###### ##  /usr/local/var/log/rabbitmq/[email protected] 
    ########## 
       Starting broker... completed with 10 plugins. 

z Seler 3.1.12 działa lokalnie:

-------------- [email protected] v3.1.12 (Cipater) 
---- **** ----- 
--- * *** * -- Darwin-13.2.0-x86_64-i386-64bit 
-- * - **** --- 
- ** ---------- [config] 
- ** ---------- .> app:   tasks:0x105dea3d0 
- ** ---------- .> transport: amqp://guest:**@localhost:5672// 
- ** ---------- .> results:  disabled 
- *** --- * --- .> concurrency: 8 (prefork) 
-- ******* ---- 
--- ***** ----- [queues] 
-------------- .> celery   exchange=celery(direct) key=celery 

mogę zaimportować metodę i pobrać wynik z „task_id”:

from tasks import add, mul 
from celery.result import AsyncResult 

result = add.delay(2,2) 
task_id = result.task_id 
result.get() # 4 

result = AsyncResult(id=task_id) 
result.get() # 4 

result = add.AsyncResult(id=task_id) 
result.get() # 4 

# and the same for the 'mul' task. Just imagine I put it here 

W następnym przykładzie podzielę te kroki między procesy. W jednym procesie ja odzyskać „task_id” tak:

from tasks import add 

result = add.delay(5,5) 
task_id = result.task_id 

A w innym procesie, jeśli mogę użyć tego samego „task_id” (skopiowany i wklejony do innego REPL, lub w innej żądanie HTTP) tak:

from celery.result import AsyncResult 

result = AsyncResult(id="copied_task_id", backend="db+mysql://u:[email protected]/db") 
result.get() # AttributeError: 'str' object has no attribute 'get_task_meta' 
result.state # AttributeError: 'str' object has no attribute 'get_task_meta' 
result.status # AttributeError: 'str' object has no attribute 'get_task_meta' 

A w innym procesie, jeśli zrobić:

from task import add # in this instance I know that an add task was performed 

result = add.AsyncResult(id="copied_task_id") 
result.status # "SUCCESSFUL" 
result.state # "SUCCESSFUL" 
result.get() # 10 

Chciałbym być w stanie uzyskać wynik nie wiedząc przed ręką, co zadaniem jest generowanie wynik. W moim prawdziwym środowisku planuję zwrócić ten task_id klientowi i pozwolić mu zapytać o status swojej pracy za pomocą żądania HTTP.

Odpowiedz

11

Ok - więc szukałem rozwiązania przez długi czas, a teraz, że mam w końcu formalnie napisali to i wyglądał na documentation znalazłem this gem:

klasa celery.result .AsyncResult (identyfikator, backend = brak, nazwa zadania = brak, aplikacja = brak, nadrzędny = brak)

Stan zadania zapytania.

Parametry:

id - patrz id.

backend - patrz backend.

wyjątkiemTimeoutError

błąd podniesiony opóźnień.

AsyncResult.Aplikacja = None

Więc zamiast zapewniać parametr backend I dostarczyły "APP" argumentu zamiast tak:

from celery.result import AsyncResult 
from task import app 

# Assuming add.delay(10,10) was called in another process 
# and that I'm using a 'task_id' I retrieved from that process 

result = AsyncResult(id='copied_task_id', app=app) 
result.state # 'SUCCESSFUL' 
result.get() # 20 

To jest chyba oczywiste dla wielu. To nie dla mnie. Na razie mogę tylko powiedzieć, że to rozwiązanie "po prostu działa", ale czułbym się bardziej komfortowo, gdybym wiedział, że jest to usankcjonowany sposób. Jeśli znasz sekcję w dokumentacji, która czyni to bardziej przejrzystym, opublikuj ją w komentarzach lub jako odpowiedź, a ja wybiorę ją jako odpowiedź, jeśli będę mogła.

+0

Dokładnie tego, czego szukałem; Podzielam twój pogląd, że jest to bardzo niejasne z dokumentacji, więc twój post bardzo mi pomógł. – markjan

+0

Możesz ustawić mały limit czasu dla tego połączenia, ponieważ niektóre wywołania selera "dostań" mogą nie wrócić przez bardzo długi czas w przypadku nieprawidłowego identyfikatora zadania lub zadania, które nie jest już znane brokerowi. Zobacz http://stackoverflow.com/a/10074280/992887 – RichVel

+0

Dziękujemy za tę poprawkę. To sprawiło, że wyciągnąłem włosy na 2 tygodnie. – Pant