2016-09-08 57 views
7

Jaki byłby odpowiednik selera dla multiprocessing.JoinableQueue (lub gevent.queue.JoinableQueue)?Ekwiwalent selera dla JoinableQueue

Funkcja, której szukam, to możliwość .join() kolejki zadań dla selera od wydawcy, oczekującej na wykonanie wszystkich zadań w kolejce.

Oczekiwanie na początkowe AsyncResult lub GroupResult nie będzie wystarczające, ponieważ kolejka dynamicznie wypełnia się samymi pracownikami.

+0

Czy możesz wyjaśnić, dlaczego "grupa (* zadania)> zastosowanie> dołączanie" nie jest wystarczające? Jak pracownicy wpływają na wyniki Twojej grupy? Dlaczego nie możesz obsłużyć wyników ".join"? – Slam

+0

@Slam Ponieważ to będzie tylko czekać na wykonanie pierwszej grupy zadań. Pracownicy nie mają wpływu na wyniki grupy, dodają więcej zadań do kolejki. Moim zamiarem jest poczekanie na wykonanie wszystkich zadań w kolejce (podobnie jak mogę "join()' 'JoinableQueue', który czekałby na' task_done() 'na każde zadanie w kolejce). Prawdopodobnie mógłbym to osiągnąć, tworząc dodatkowy współdzielony licznik i pub/sub w Redis, ale zastanawiałem się, czy istnieje czystsza i bardziej niezawodna metoda z wykorzystaniem właśnie Seler. –

Odpowiedz

0

To może nie być idealne, ale w końcu to właśnie wymyśliłem.

Jest to po prostu otokę o nazwie JoinableQueue umieszczoną na istniejącej kolejce Celery, na podstawie współdzielonego licznika Redis i detektora list. Wymaga, aby nazwa kolejki była taka sama jak klucz routingu (ze względu na wewnętrzne szczegóły implementacji sygnałów before_task_publish i task_postrun).

joinableceleryqueue.py:

from celery.signals import before_task_publish, task_postrun 
from redis import Redis 
import settings 

memdb = Redis.from_url(settings.REDIS_URL) 

class JoinableCeleryQueue(object): 
    def __init__(self, queue): 
     self.queue = queue 
     self.register_queue_hooks() 

    def begin(self): 
     memdb.set(self.count_prop, 0) 

    @property 
    def count_prop(self): 
     return "jqueue:%s:count" % self.queue 

    @property 
    def finished_prop(self): 
     return "jqueue:%s:finished" % self.queue 

    def task_add(self, routing_key, **kw): 
     if routing_key != self.queue: 
      return 

     memdb.incr(self.count_prop) 

    def task_done(self, task, **kw): 
     if task.queue != self.queue: 
      return 

     memdb.decr(self.count_prop) 
     if memdb.get(self.count_prop) == "0": 
      memdb.rpush(self.finished_prop, 1) 

    def register_queue_hooks(self): 
     before_task_publish.connect(self.task_add) 
     task_postrun.connect(self.task_done) 

    def join(self): 
     memdb.brpop(self.finished_prop) 

zdecydowałem się użyć BRPOP zamiast pub/sub jak muszę tylko jeden słuchacz słuchanie "all zakończeniu zadania" zdarzenia (wydawca).

Korzystanie z JoinableCeleryQueue jest całkiem proste - begin() przed dodaniem jakichkolwiek zadań do kolejki, dodawanie zadań za pomocą zwykłego interfejsu API selera, .join() w celu oczekiwania na wykonanie wszystkich zadań.