2014-11-28 33 views
5

Chcę wysłać wiadomości do selera, a gdy dotrze, powiedzmy 100 wiadomości, chcę, aby seler wykonał je partiami. Jest to typowy scenariusz, jeśli chcę zatwierdzać w partiach do bazy danych.Seler wykonuję zadanie z serią wiadomości

W tym celu podczas googlowania wokół znalazłem ten link: do wykonywania partii z selera: http://celery.readthedocs.org/en/latest/reference/celery.contrib.batches.html

Moim problemem jest to, że w tym przykładzie nie jest oczywistym sposobem, aby uzyskać dane przekazane do zadania

na przykład powiedzmy, że mamy złożyć jeden po drugim kilka wiadomości z:

task.apply_async((message,), link_error=error_handler.s()) 

i wtedy mamy następujące realizację:

@celery.task(name="process.data", base=Batches, flush_every=100, flush_interval=1) 
def process_messages(requests): 
    for request in requests: 
     print request /// how I can take the message data submitted in my task for process? 

Czy istnieje alternatywny sposób na uzyskanie partii selera? Dziękuję

Odpowiedz

5

Dla każdego, kto znajdzie ten post użyteczne po wielu prób i błędów udało mi się wziąć dane z SimplRequest obiektu w następujący sposób:

Po przesłaniu danych z następujący sposób :

func.delay(data) 

od obiektu żądania można uzyskać args atrybut, który znajduje się lista z danymi:

request.args[0] 
request.args[1] 
etc. 

Jeśli przesłać dane z następujący sposób:

func.apply_async((), {'data': data}, link_error=error_handler.s()) 

następnie dane są dostępne w postaci słownika w kwargs:

request.kwargs['data'] 

Wreszcie, jak widać na powyższym przykładzie musimy zrobić pętlę do wszystkich żądań zebrać partia dane dotyczą

for r in requests: 
     data = r.kwargs['data'] 

byłoby miło dla przykładów w stronę dokumentacji (here), aby być na bieżąco z bardziej prosty i przejrzysty przykład

+0

FYI 'celery.contrib.batches' zostało usunięte w Celery 4 :-(. Patrz: http://docs.celeryproject.org/en/latest/whatsnew-4.0.html?highlight=batches#features-removed-for-simplicity. – illagrenan