Szukam sposobu użycia kombu jako adaptera MQ między tornado-sockjs i serwerem aplikacji Django. Zrobiłem coś takiego:Kombu w sposób nieblokujący
class BrokerClient(ConsumerMixin):
clients = []
def __init__(self):
self.connection = BrokerConnection(settings.BROKER_URL)
self.io_loop = ioloop.IOLoop.instance()
self.queue = sockjs_queue
self._handle_loop()
@staticmethod
def instance():
if not hasattr(BrokerClient, '_instance'):
BrokerClient._instance = BrokerClient()
return BrokerClient._instance
def add_client(self, client):
self.clients.append(client)
def remove_client(self, client):
self.clients.remove(client)
def _handle_loop(self):
try:
if self.restart_limit.can_consume(1):
for _ in self.consume(limit=5):
pass
except self.connection.connection_errors:
print ('Connection to broker lost. '
'Trying to re-establish the connection...')
self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop)
def get_consumers(self, Consumer, channel):
return [Consumer([self.queue, ], callbacks=[self.process_task])]
def process_task(self, body, message):
for client in self.clients:
if hasattr(body, 'users') and client.user.pk in body.users:
client.send(body)
message.ack()
Ale tornado zablokowane przy wykonywaniu _handle_loop (zgodnie z oczekiwaniami).
Czy istnieje sposób, aby temu zapobiec?
Jestem świadomy adaptera biblioteki Pika dla Tornado, ale chciałbym użyć kombu, ponieważ jest już używany w projekcie i ma elastyczne transporty.
UPDATE:
Zmieniono _handle_loop do funkcji generatora
def drain_events(self, callback):
with self.Consumer() as (connection, channel, consumers):
with self.extra_context(connection, channel):
try:
connection.drain_events(timeout=1)
except:
pass
callback(None)
@tornado.gen.engine
def _handle_loop(self):
response = yield tornado.gen.Task(self.drain_events)
self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop)
UWAGA: W chwili pisania tego artykułu najnowsze py-amqp i kombu oficjalnie nie obsługują asynchronicznych odczytów. [zobacz tę] (https://github.com/celery/py-amqp/issues/25). Istnieją jednak [breadcrumbs] (https://github.com/celery/kombu/blob/master/examples/experimental/async_consume.py) dla konsumpcji asynchronicznej – Realistic