2011-12-14 10 views
33

Używam Celery do zarządzania zadaniami asynchronicznymi. Czasami jednak proces selekcji spada, co nie powoduje wykonania żadnego zadania. Chciałbym móc sprawdzić stan selera i upewnić się, że wszystko działa poprawnie, a jeśli wykryję jakiekolwiek problemy, wyświetl komunikat o błędzie dla użytkownika. Z dokumentacji Selera Worker wygląda na to, że mógłbym użyć do tego celu ping lub inspect, ale ping wydaje się być hackowaty i nie jest jasne, w jaki dokładnie sposób ma być używana inspekcja (jeśli inspect(). Registered() jest pusty?).Wykrywanie, czy Seler jest dostępny/działa

Wszelkie wskazówki na ten temat będą mile widziane. Zasadniczo co szukam to metoda tak:

def celery_is_alive(): 
    from celery.task.control import inspect 
    return bool(inspect().registered()) # is this right?? 

EDIT: To nawet nie wygląda zarejestrowany() jest dostępna na selera 2.3.3 (chociaż wymienić go na 2,1 docs). Może ping jest właściwą odpowiedzią.

EDYCJA: Ping również nie wygląda tak, jak myślałem, więc nadal nie jestem pewien odpowiedzi tutaj.

+0

Czy odpowiedź poniżej nie pracować dla Ciebie? Jako ktoś, kto ma podobny problem do rozwiązania, chciałbym potwierdzić. – kojiro

Odpowiedz

44

Oto kod, którego używam. celery.task.control.Inspect.stats() zwraca dict zawierający wiele szczegółowych informacji o aktualnie dostępnych pracownikach, Brak, jeśli nie ma uruchomionych pracowników, lub podnosi IOError, jeśli nie może połączyć się z brokerem wiadomości. Używam RabbitMQ - możliwe, że inne systemy przesyłania wiadomości mogą działać nieco inaczej. To działało w systemach Celery 2.3.x i 2.4.x; Nie jestem pewien, jak daleko się posuwa.

def get_celery_worker_status(): 
    ERROR_KEY = "ERROR" 
    try: 
     from celery.task.control import inspect 
     insp = inspect() 
     d = insp.stats() 
     if not d: 
      d = { ERROR_KEY: 'No running Celery workers were found.' } 
    except IOError as e: 
     from errno import errorcode 
     msg = "Error connecting to the backend: " + str(e) 
     if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED': 
      msg += ' Check that the RabbitMQ server is running.' 
     d = { ERROR_KEY: msg } 
    except ImportError as e: 
     d = { ERROR_KEY: str(e)} 
    return d 
+0

Pracowałem dla mnie :) – kojiro

+6

Odkryłem, że powyższe dodaje dwie kolejki reply.celery.pidbox do rabbitmq przy każdym uruchomieniu. Prowadzi to do przyrostowego zwiększenia wykorzystania pamięci rabbitmq. – kojiro

2

Następujące pracował dla mnie:

import socket 
from kombu import Connection 

celery_broker_url = "amqp://localhost" 

try: 
    conn = Connection(celery_broker_url) 
    conn.ensure_connection(max_retries=3) 
except socket.error: 
    raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url)) 
+2

Jestem prawie pewien, że to się powiedzie, jeśli rabbitmq będzie działał niezależnie od statusu selera. Ale to jest dobra okazja do zrobienia, jeśli seler nie zda sobie sprawy, czy porażka jest z rabbitmq czy czymś innym. –