2015-01-26 13 views
6

Czy istnieje standardowa metoda na wyjaśnianie zadań w Celery?Oddać zadania Celery?

Na przykład tak, że to zadanie może być „zaczął” kilka razy, ale zostanie uruchomiony tylko raz po pewnym opóźnieniem:

def debounce_task(task): 
    if task_is_queued(task): 
     return 
    task.apply_async(countdown=30) 
+0

Możesz użyć pamięci podręcznej. Wiele magazynów kluczy ma zapisy czasowe, spróbuj pobrać wynik ze sklepu, jeśli nie ma wyniku, uruchom zadanie i zapisz wynik z czasem wygaśnięcia przed powrotem. Użyj tylko jednego pracownika, aby zadania były wykonywane sekwencyjnie. Unikaj schematów blokowania, chyba że chcesz uporać się z nieaktualnymi blokadami. –

+0

Och, absolutnie. Ale wolałbym uniknąć skomplikowanych fragmentów implementacji ujawniania siebie (sprawdzanie argumentów, wyników śledzenia itp.) I zastanawiam się, czy istnieje jakikolwiek standardowy sposób robienia tego. –

+0

Pisanie dekoratora pamięci podręcznej w Pythonie jest bardzo proste (może to być 4 linie), szkoda, że ​​nie miałem czasu opublikować pełnej odpowiedzi. –

Odpowiedz

4

Oto jak to zrobić z liczników Redis. Wszystko to prawdopodobnie może być uogólnione w dekoratorze, ale używamy go tylko do określonego zadania (webhooks).

Twoim publicznym zadaniem jest to, co wywołujesz z innych funkcji. Będzie potrzebował zwiększyć klucz w Redis. Kluczem jest utworzona przez argumenty swojej funkcji, co może być (zapewnia to licznik jest wyjątkowa pośród poszczególnych zadań)

@task 
def your_public_task(*args, **kwargs): 
    cache_key = make_public_task_cache_key(*args, **kwargs) 
    get_redis().incr(cache_key) 
    _your_task(*args, **kwargs, countdown=settings.QUEUE_DELAY) 

Uwaga funkcje kluczowych cache są dzielone (chcesz tego samego klucza pamięci podręcznej w każdej funkcji) i ustawienie countdown.

Następnie rzeczywiste zadanie wykonywanie kodu wykonuje następujące operacje:

@task 
def _your_task(*args, **kwargs): 
    cache_key = make_public_task_cache_key(*args, **kwargs) 
    counter = get_redis().getset(cache_key, 0) 
    # redis makes the zero a string. 
    if counter == '0': 
     return 

    ... execute your actual task code. 

Pozwala to hit your_public_task.delay(..) tyle razy, ile chcesz, w swoim QUEUE_DELAY, i będzie to tylko raz wystrzelić.

1

Oto, jak możesz to zrobić z Mongo.

UWAGA: Musiałem uczynić projekt nieco bardziej wyrozumiałym, ponieważ zadania Celery nie gwarantują wykonania dokładnie momentu, w którym eta jest spełniony lub skończy się countdown.

Ponadto, wygasające indeksy Mongo są czyszczone co minutę; Nie można opierać projektu wokół rekordów, które są usuwane, gdy tylko pojawi się eta.

Tak czy inaczej, przepływ jest coś takiego:

  1. Kod Klient zwraca my_task.
  2. preflight zwiększa licznik połączeń i zwraca go jako flight_id
  3. _my_task ma zostać wykonany po TTL sekund.
  4. Po uruchomieniu _my_task sprawdza, czy numer flight_id jest nadal aktualny. Jeśli nie, przerwie się.
  5. ... Jakiś czas później ... Mongo sprząta nieaktualne wpisy w kolekcji via an expiring index.

@celery.task(track_started=False, ignore_result=True) 
def my_task(my_arg): 
    flight_id = preflight(inflight_collection, 'my_task', HASH(my_arg), TTL) 
    _my_task.apply_async((my_arg,), {'flight_id':flight_id}, countdown=TTL) 

@celery.task(track_started=False, ignore_result=True) 
def _my_task(my_arg, flight_id=None): 
    if not check_for_takeoff(inflight_collection, 'my_task', HASH(my_arg), flight_id): 
     return 
    # ... actual work ... # 

kod Biblioteka:

TTL = 5 * 60  # Run tasks after 5 minutes 
EXPIRY = 6 * TTL # This needs to be much larger than TTL. 

# We need to store a list of task-executions currently pending 
inflight_collection = db['celery_In_Flight'] 
inflight_collection.create_index([('fn', pymongo.ASCENDING,), 
            ('key', pymongo.ASCENDING,)]) 
inflight_collection.create_index('eta', expiresAfterSeconds=EXPIRY) 


def preflight(collection, fn, key, ttl): 
    eta = datetime.datetime.now() + datetime.timedelta(seconds=ttl) 
    result = collection.find_one_and_update({ 
     'fn': fn, 
     'key': key, 
    }, { 
     '$set': { 
      'eta': eta 
     }, 
     '$inc': { 
      'flightId': 1 
     } 
    }, upsert=True, return_document=pymongo.ReturnDocument.AFTER) 
    print 'Preflight[{}][{}] = {}'.format(fn, key, result['flightId']) 
    return result['flightId'] 


def check_for_takeoff(collection, fn, key, flight_id): 
    result = collection.find_one({ 
     'fn': fn, 
     'key': key 
    }) 
    ready = result is None or result['flightId'] == flight_id 
    print 'Check[{}][{}] = {}, {}'.format(fn, key, result['flightId'], ready) 
    return ready 
0

Bartek ma pomysł , używaj liczników redis, które są atomowe (i powinny być łatwo dostępne, jeśli twoim brokerem jest redis). Chociaż jego rozwiązanie jest dudniące, a nie demolka. Różnica jest jednak niewielka (getset vs decr).

kolejce zadanie:

conn = get_redis() 
conn.incr(key) 
task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

Następnie w zadaniu:

conn = get_redis() 
counter = conn.decr(key) 
if counter > 0: 
    # task is still queued 
    return 
# continue on to rest of task 

Trudno, aby to dekorator ponieważ trzeba ozdobić zadanie i powołanie samego zadania. Więc będziesz potrzebował dekoratora przed dekoratorem selera @task i jeden po nim.

Na razie mam tylko kilka funkcji, które pomagają mi wywoływać zadanie, i które sprawdza na początku zadania.

+0

http://stackoverflow.com/a/43625455/4391298 to rozwiązanie, na które w końcu mogłem sobie poradzić, włączając w to kluczową datę wygaśnięcia, aby obsłużyć to stopienie (nie stanowi to problemu w oryginalnym rozwiązaniu). –

0

Oto rozwiązanie wymyśliłem: https://gist.github.com/wolever/3cf2305613052f3810a271e09d42e35c

i kopiowane tu dla potomnych:

import time 

import redis 


def get_redis_connection(): 
    return redis.connect() 

class TaskDebouncer(object): 
    """ A simple Celery task debouncer. 

     Usage:: 

      def debounce_process_corpus(corpus): 
       # Only one task with ``key`` will be allowed to execute at a 
       # time. For example, if the task was resizing an image, the key 
       # might be the image's URL. 
       key = "process_corpus:%s" %(corpus.id,) 
       TaskDebouncer.delay(
        key, my_taks, args=[corpus.id], countdown=0, 
       ) 

      @task(bind=True) 
      def process_corpus(self, corpus_id, debounce_key=None): 
       debounce = TaskDebouncer(debounce_key, keepalive=30) 

       corpus = Corpus.load(corpus_id) 

       try: 
        for item in corpus: 
         item.process() 

         # If ``debounce.keepalive()`` isn't called every 
         # ``keepalive`` interval (the ``keepalive=30`` in the 
         # call to ``TaskDebouncer(...)``) the task will be 
         # considered dead and another one will be allowed to 
         # start. 
         debounce.keepalive() 

       finally: 
        # ``finalize()`` will mark the task as complete and allow 
        # subsequent tasks to execute. If it returns true, there 
        # was another attempt to start a task with the same key 
        # while this task was running. Depending on your business 
        # logic, this might indicate that the task should be 
        # retried. 
        needs_retry = debounce.finalize() 

       if needs_retry: 
        raise self.retry(max_retries=None) 

    """ 

    def __init__(self, key, keepalive=60): 
     if key: 
      self.key = key.partition("!")[0] 
      self.run_key = key 
     else: 
      self.key = None 
      self.run_key = None 
     self._keepalive = keepalive 
     self.cxn = get_redis_connection() 
     self.init() 
     self.keepalive() 

    @classmethod 
    def delay(cls, key, task, args=None, kwargs=None, countdown=30): 
     cxn = get_redis_connection() 
     now = int(time.time()) 
     first = cxn.set(key, now, nx=True, ex=countdown + 10) 
     if not first: 
      now = cxn.get(key) 

     run_key = "%s!%s" %(key, now) 
     if first: 
      kwargs = dict(kwargs or {}) 
      kwargs["debounce_key"] = run_key 
      task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

     return (first, run_key) 

    def init(self): 
     self.initial = self.key and self.cxn.get(self.key) 

    def keepalive(self, expire=None): 
     if self.key is None: 
      return 
     expire = expire if expire is not None else self._keepalive 
     self.cxn.expire(self.key, expire) 

    def is_out_of_date(self): 
     if self.key is None: 
      return False 
     return self.cxn.get(self.key) != self.initial 

    def finalize(self): 
     if self.key is None: 
      return False 
     with self.cxn.pipeline() as pipe: 
      while True: 
       try: 
        pipe.watch(self.key) 
        if pipe.get(self.key) != self.initial: 
         return True 
        pipe.multi() 
        pipe.delete(self.key) 
        pipe.execute() 
        break 
       except redis.WatchError: 
        continue 
     return False 
0

Oto bardziej wypełnione rozwiązanie oparte off https://stackoverflow.com/a/28157498/4391298 ale zamienił się w dekoratora i sięgając do Kombu puli połączeń, aby ponownie wykorzystać swój licznik Redis.

import logging 
from functools import wraps 

# Not strictly required 
from django.core.exceptions import ImproperlyConfigured 
from django.core.cache.utils import make_template_fragment_key 

from celery.utils import gen_task_name 


LOGGER = logging.getLogger(__name__) 


def debounced_task(**options): 
    """Debounced task decorator.""" 

    try: 
     countdown = options.pop('countdown') 
    except KeyError: 
     raise ImproperlyConfigured("Debounced tasks require a countdown") 

    def factory(func): 
     """Decorator factory.""" 
     try: 
      name = options.pop('name') 
     except KeyError: 
      name = gen_task_name(app, func.__name__, func.__module__) 

     @wraps(func) 
     def inner(*args, **kwargs): 
      """Decorated function.""" 

      key = make_template_fragment_key(name, [args, kwargs]) 
      with app.pool.acquire_channel(block=True) as (_, channel): 
       depth = channel.client.decr(key) 

       if depth <= 0: 
        try: 
         func(*args, **kwargs) 
        except: 
         # The task failed (or is going to retry), set the 
         # count back to where it was 
         channel.client.set(key, depth) 
         raise 
       else: 
        LOGGER.debug("%s calls pending to %s", 
           depth, name) 

     task = app._task_from_fun(inner, **options, name=name + '__debounced') 

     @wraps(func) 
     def debouncer(*args, **kwargs): 
      """ 
      Debouncer that calls the real task. 
      This is the task we are scheduling.""" 

      key = make_template_fragment_key(name, [args, kwargs]) 
      with app.pool.acquire_channel(block=True) as (_, channel): 
       # Mark this key to expire after the countdown, in case our 
       # task never runs or runs too many times, we want to clean 
       # up our Redis to eventually resolve the issue. 
       channel.client.expire(key, countdown + 10) 
       depth = channel.client.incr(key) 

      LOGGER.debug("Requesting %s in %i seconds (depth=%s)", 
         name, countdown, depth) 
      task.si(*args, **kwargs).apply_async(countdown=countdown) 

     return app._task_from_fun(debouncer, **options, name=name) 

    return factory