2010-01-25 16 views
16

Mam projekt Django i staram się używać Celery do przesyłania zadań do przetwarzania w tle (http://ask.github.com/celery/introduction.html). Seler dobrze się integruje z Django i mogłem przesłać swoje niestandardowe zadania i uzyskać wyniki.Jak skonfigurować program Seler, aby wywoływał niestandardową funkcję inicjującą przed uruchomieniem moich zadań?

Jedynym problemem jest to, że nie mogę znaleźć rozsądnego sposobu wykonywania niestandardowej inicjalizacji w procesie demona. Muszę zadzwonić do drogiej funkcji, która ładuje dużo pamięci, zanim zacznę przetwarzać zadania, i nie mogę sobie pozwolić na wywołanie tej funkcji za każdym razem.

Czy ktoś miał wcześniej ten problem? Wszelkie pomysły na obejście tego bez modyfikowania kodu źródłowego Seler?

Dzięki

+0

jakiego rodzaju inicjowania niestandardowego trzeba uruchomić? – diegueus9

+0

Potrzebuję załadować strukturę danych o wielkości ~ 10 MB, która jest wymagana do przetwarzania każdego zadania (struktura jest taka sama dla wszystkich zadań). – xelk

Odpowiedz

15

Można też napisać ładowarka niestandardową lub użyć sygnałów.

Ładowarki mieć metodę on_task_init, która jest wywoływana, gdy zadanie ma być wykonane, i on_worker_init która jest wywoływana przez selera + celerybeat procesu głównego.

Używanie sygnałów jest chyba najprostszy, sygnały dostępne są:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    Wywoływane, gdy zadanie ma być wykonywane przez pracownika (lub lokalnie jeśli używasz apply/lub jeśli został ustawiony CELERY_ALWAYS_EAGER).

  • task_postrun(task_id, task, args, kwargs, retval) Wywoływane po wykonaniu zadania w takich samych warunkach jak powyżej.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    Wywoływana, gdy zadanie jest stosowana (nie dobre dla długotrwałych operacji) dostępny

Dodatkowe sygnały w 0.9.x (aktualny mistrz oddział na github):

  • worker_init()

    Wywoływana po uruchomieniu seleryd (przed zainicjowaniem zadania, więc jeśli system obsługuje obsługę fork, wszelkie zmiany pamięci zostaną skopiowane do procesów roboczych potomka ).

  • worker_ready()

    Wywoływana gdy celeryd jest w stanie odbierać zadania.

  • worker_shutdown()

    Wywoływana gdy celeryd jest zamykany.

Oto przykład precalculating coś po raz pierwszy zadanie jest uruchamiane w procesie:

from celery.task import Task 
from celery.registry import tasks 
from celery.signals import task_prerun 

_precalc_table = {} 

class PowersOfTwo(Task): 

    def run(self, x): 
     if x in _precalc_table: 
      return _precalc_table[x] 
     else: 
      return x ** 2 
tasks.register(PowersOfTwo) 


def _precalc_numbers(**kwargs): 
    if not _precalc_table: # it's empty, so haven't been generated yet 
     for i in range(1024): 
      _precalc_table[i] = i ** 2 


# need to use registered instance for sender argument. 
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name]) 

Jeśli chcesz funkcja być prowadzony dla wszystkich zadań, po prostu pominąć sender argument.