To może nie być idealne, ale w końcu to właśnie wymyśliłem.
Jest to po prostu otokę o nazwie JoinableQueue
umieszczoną na istniejącej kolejce Celery, na podstawie współdzielonego licznika Redis i detektora list. Wymaga, aby nazwa kolejki była taka sama jak klucz routingu (ze względu na wewnętrzne szczegóły implementacji sygnałów before_task_publish
i task_postrun
).
joinableceleryqueue.py:
from celery.signals import before_task_publish, task_postrun
from redis import Redis
import settings
memdb = Redis.from_url(settings.REDIS_URL)
class JoinableCeleryQueue(object):
def __init__(self, queue):
self.queue = queue
self.register_queue_hooks()
def begin(self):
memdb.set(self.count_prop, 0)
@property
def count_prop(self):
return "jqueue:%s:count" % self.queue
@property
def finished_prop(self):
return "jqueue:%s:finished" % self.queue
def task_add(self, routing_key, **kw):
if routing_key != self.queue:
return
memdb.incr(self.count_prop)
def task_done(self, task, **kw):
if task.queue != self.queue:
return
memdb.decr(self.count_prop)
if memdb.get(self.count_prop) == "0":
memdb.rpush(self.finished_prop, 1)
def register_queue_hooks(self):
before_task_publish.connect(self.task_add)
task_postrun.connect(self.task_done)
def join(self):
memdb.brpop(self.finished_prop)
zdecydowałem się użyć BRPOP
zamiast pub/sub jak muszę tylko jeden słuchacz słuchanie "all zakończeniu zadania" zdarzenia (wydawca).
Korzystanie z JoinableCeleryQueue
jest całkiem proste - begin()
przed dodaniem jakichkolwiek zadań do kolejki, dodawanie zadań za pomocą zwykłego interfejsu API selera, .join()
w celu oczekiwania na wykonanie wszystkich zadań.
Czy możesz wyjaśnić, dlaczego "grupa (* zadania)> zastosowanie> dołączanie" nie jest wystarczające? Jak pracownicy wpływają na wyniki Twojej grupy? Dlaczego nie możesz obsłużyć wyników ".join"? – Slam
@Slam Ponieważ to będzie tylko czekać na wykonanie pierwszej grupy zadań. Pracownicy nie mają wpływu na wyniki grupy, dodają więcej zadań do kolejki. Moim zamiarem jest poczekanie na wykonanie wszystkich zadań w kolejce (podobnie jak mogę "join()' 'JoinableQueue', który czekałby na' task_done() 'na każde zadanie w kolejce). Prawdopodobnie mógłbym to osiągnąć, tworząc dodatkowy współdzielony licznik i pub/sub w Redis, ale zastanawiałem się, czy istnieje czystsza i bardziej niezawodna metoda z wykorzystaniem właśnie Seler. –