2013-09-05 15 views
6

Obecnie używam django z selerem i wszystko działa dobrze.Django Seler uzyskać liczbę zadań

Jednak chcę móc dać użytkownikom możliwość anulowania zadania, jeśli serwer jest przeciążony przez sprawdzenie, ile zadań jest obecnie zaplanowanych.

Jak mogę to osiągnąć?

Używam redis jako brokera.

Właśnie znalazłem to: Retrieve list of tasks in a queue in Celery

Jest jakiś sposób odnoszą się do mojego problemu, ale ja nie potrzebuję do listy zadań, wystarczy policzyć :)

Odpowiedz

8

Jeśli broker jest skonfigurowany jako redis://localhost:6379/1, i wasze zadania są przedstawione na walnym celery kolejce, a następnie można uzyskać długość za pomocą następujących środków:

import redis 
queue_name = "celery" 
client = redis.Redis(host="localhost", port=6379, db=1) 
length = client.llen(queue_name) 

Albo ze skryptu powłoki (dobre dla monitorów i takie):

$ redis-cli -n 1 -h localhost -p 6379 llen celery 
+0

Nawet jeśli jest to właściwe rozwiązanie dla maklera Redis, proszę zaznaczyć @stephen komentarz Fuhry jako prawidłowe rozwiązanie, ponieważ jest broker agnostyk . –

4

Jeśli masz już skonfigurowane Redis w swojej aplikacji, możesz spróbować tego:

from celery import Celery 

QUEUE_NAME = 'celery' 

celery = Celery(app) 
client = celery.connection().channel().client 

length = client.llen(QUEUE_NAME) 
+0

Dla redis, 'client = app.broker_connection(). Channel(). Client' –

7

Oto w jaki sposób można uzyskać liczbę wiadomości w kolejce przy użyciu selera, który jest broker- agnostyk.

Korzystając z connection_or_acquire, można zminimalizować liczbę otwartych połączeń z brokerem, wykorzystując wewnętrzne połączenie puli selera.

celery = Celery(app) 

with celery.connection_or_acquire() as conn: 
    conn.default_channel.queue_declare(
     queue='my-queue', passive=True).message_count 

Możesz również rozszerzyć Seler, aby zapewnić tę funkcjonalność:

from celery import Celery as _Celery 


class Celery(_Celery) 

    def get_message_count(self, queue): 
     ''' 
     Raises: amqp.exceptions.NotFound: if queue does not exist 
     ''' 
     with self.connection_or_acquire() as conn: 
      return conn.default_channel.queue_declare(
       queue=queue, passive=True).message_count 


celery = Celery(app) 
num_messages = celery.get_message_count('my-queue') 
+2

Proszę podać jakieś wyjaśnienie również, aby wesprzeć odpowiedź. – Lal

+0

@Lal Dodano pewne wyjaśnienie podejścia - nadzieja, która pomaga! –

+1

amqp.exceptions.NotFound: Queue.declare: (404) NOT_FOUND - brak kolejki 'default' w vhost '/' Ponieważ moja kolejka nie znajduje się na hostu '/', jest na hoście '/ apples'. Jak dostać się do tego hosta? – Simanas