2015-02-16 26 views
5

Mam dwie oddzielne instancje RabbitMQ. Próbuję znaleźć najlepszy sposób na słuchanie wydarzeń z obu.Python and RabbitMQ - Najlepszy sposób na słuchanie zdarzeń na różnych kanałach?

Na przykład, mogę spożywać zdarzeń na jednym z następujących:

credentials = pika.PlainCredentials(user, pass) 
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials)) 
channel = connection.channel() 
result = channel.queue_declare(Exclusive=True) 
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*') 
channel.basic_consume(callback_func, result.method.queue, no_ack=True) 
self.channel.start_consuming() 

mam drugi gospodarz „host2”, że chciałbym, aby słuchać, jak również. Pomyślałem o stworzeniu dwóch osobnych wątków do zrobienia tego, ale z tego, co przeczytałem, pika nie jest bezpieczna dla wątków. Czy istnieje lepszy sposób? Czy może tworzenie dwóch osobnych wątków, każdy słuchanie innej instancji Rabbit (host1 i host2) jest wystarczający?

Odpowiedz

24

Odpowiedź na pytanie "jaki jest najlepszy sposób" zależy w dużym stopniu od sposobu użytkowania kolejek i tego, co rozumie się przez "najlepsze". Ponieważ nie mogę jeszcze komentować pytań, po prostu spróbuję zasugerować kilka możliwych rozwiązań.

W każdym przykładzie zamierzam założyć, że wymiana jest już zadeklarowana.

wątki

można spożywać wiadomości z dwóch kolejkach na oddzielnych hostów w jednym procesie z wykorzystaniem pika.

Masz rację - jako its own FAQ states, pika nie jest bezpieczna dla wątków, ale może być używana w trybie wielowątkowym, tworząc połączenia z hostami RabbitMQ na wątek. Dokonywanie ten przykład uruchomić w wątkach stosując moduł threading wygląda następująco:

import pika 
import threading 


class ConsumerThread(threading.Thread): 
    def __init__(self, host, *args, **kwargs): 
     super(ConsumerThread, self).__init__(*args, **kwargs) 

     self._host = host 

    # Not necessarily a method. 
    def callback_func(self, channel, method, properties, body): 
     print("{} received '{}'".format(self.name, body)) 

    def run(self): 
     credentials = pika.PlainCredentials("guest", "guest") 

     connection = pika.BlockingConnection(
      pika.ConnectionParameters(host=self._host, 
             credentials=credentials)) 

     channel = connection.channel() 

     result = channel.queue_declare(exclusive=True) 

     channel.queue_bind(result.method.queue, 
          exchange="my-exchange", 
          routing_key="*.*.*.*.*") 

     channel.basic_consume(self.callback_func, 
           result.method.queue, 
           no_ack=True) 

     channel.start_consuming() 


if __name__ == "__main__": 
    threads = [ConsumerThread("host1"), ConsumerThread("host2")] 
    for thread in threads: 
     thread.start() 

Mam deklarowaną callback_func jako metoda czysto używać ConsumerThread.name podczas drukowania treści wiadomości. Równie dobrze może to być funkcja spoza klasy ConsumerThread.

Procesy

Alternatywnie, można zawsze uruchomić tylko jeden proces z kodem konsumentów za kolejce chcesz spożywać wydarzenia.

import pika 
import sys 


def callback_func(channel, method, properties, body): 
    print(body) 


if __name__ == "__main__": 
    credentials = pika.PlainCredentials("guest", "guest") 

    connection = pika.BlockingConnection(
     pika.ConnectionParameters(host=sys.argv[1], 
            credentials=credentials)) 

    channel = connection.channel() 

    result = channel.queue_declare(exclusive=True) 

    channel.queue_bind(result.method.queue, 
         exchange="my-exchange", 
         routing_key="*.*.*.*.*") 

    channel.basic_consume(callback_func, result.method.queue, no_ack=True) 

    channel.start_consuming() 

a następnie uruchomić przez:

$ python single_consume.py host1 
$ python single_consume.py host2 # e.g. on another console 

Jeśli praca robisz na wiadomości z kolejek jest CPU-heavy i tak długo, jak liczba rdzeni w CPU> = liczba konsumentów, na ogół lepiej jest stosować to podejście - chyba że twoje koleje są puste przez większość czasu, a konsumenci nie wykorzystają tego czasu procesora *.

asynchroniczny

Inną alternatywą jest wiążą się asynchroniczne ramy (na przykład Twisted) i całość działa w pojedynczej nici.

Nie można już używać BlockingConnection w asynchronicznym kodzie; Na szczęście pika posiada adapter do Twisted:

from pika.adapters.twisted_connection import TwistedProtocolConnection 
from pika.connection import ConnectionParameters 
from twisted.internet import protocol, reactor, task 
from twisted.python import log 


class Consumer(object): 
    def on_connected(self, connection): 
     d = connection.channel() 
     d.addCallback(self.got_channel) 
     d.addCallback(self.queue_declared) 
     d.addCallback(self.queue_bound) 
     d.addCallback(self.handle_deliveries) 
     d.addErrback(log.err) 

    def got_channel(self, channel): 
     self.channel = channel 

     return self.channel.queue_declare(exclusive=True) 

    def queue_declared(self, queue): 
     self._queue_name = queue.method.queue 

     self.channel.queue_bind(queue=self._queue_name, 
           exchange="my-exchange", 
           routing_key="*.*.*.*.*") 

    def queue_bound(self, ignored): 
     return self.channel.basic_consume(queue=self._queue_name) 

    def handle_deliveries(self, queue_and_consumer_tag): 
     queue, consumer_tag = queue_and_consumer_tag 
     self.looping_call = task.LoopingCall(self.consume_from_queue, queue) 

     return self.looping_call.start(0) 

    def consume_from_queue(self, queue): 
     d = queue.get() 

     return d.addCallback(lambda result: self.handle_payload(*result)) 

    def handle_payload(self, channel, method, properties, body): 
     print(body) 


if __name__ == "__main__": 
    consumer1 = Consumer() 
    consumer2 = Consumer() 

    parameters = ConnectionParameters() 
    cc = protocol.ClientCreator(reactor, 
           TwistedProtocolConnection, 
           parameters) 
    d1 = cc.connectTCP("host1", 5672) 
    d1.addCallback(lambda protocol: protocol.ready) 
    d1.addCallback(consumer1.on_connected) 
    d1.addErrback(log.err) 

    d2 = cc.connectTCP("host2", 5672) 
    d2.addCallback(lambda protocol: protocol.ready) 
    d2.addCallback(consumer2.on_connected) 
    d2.addErrback(log.err) 

    reactor.run() 

Takie podejście byłoby jeszcze lepiej, im więcej kolejek będzie można spożywać od a mniej CPU-bound praca wykonywania przez konsumentów jest *.

Python 3

Skoro już wspomnieliśmy pika, mam ograniczone się do rozwiązań opartych na Pythona 2.x, ponieważ pika nie jest jeszcze przeniesiony.

Ale gdybyś chciał przejść do> = 3.3, jedną z możliwych opcji jest użycie asyncio z jednym z protokołów AMQP (protokół, którym rozmawiasz z RabbitMQ), np. asynqp lub aioamqp.

* - proszę zauważyć, że są to bardzo płytkie wskazówki - w większości przypadków wybór nie jest aż tak oczywisty; co będzie dla ciebie najlepsze, zależy od "nasycenia" kolejek (wiadomości/czasu), od tego, jaką pracę wykonujesz po otrzymaniu tych wiadomości, w jakim środowisku prowadzisz swoich klientów itp .; nie ma sposobu, aby mieć pewność, inne niż do benchmarku wszystkie implementacje

+0

Dzięki, bardzo pomocne. – blindsnowmobile

+0

Nie ma za co. :) Zwróciłem także uwagę na jeszcze jedną rzecz w edycji. – Unit03

0

Poniżej jest przykład jak użyć jednej instancji RabbitMQ słuchać 2 kolejkach w tym samym czasie:

import pika 
import threading 

threads=[] 
def client_info(channel):  
    channel.queue_declare(queue='proxy-python') 
    print (' [*] Waiting for client messages. To exit press CTRL+C') 


    def callback(ch, method, properties, body): 
     print (" Received %s" % (body)) 

    channel.basic_consume(callback, queue='proxy-python', no_ack=True) 
    channel.start_consuming() 

def scenario_info(channel):  
    channel.queue_declare(queue='savi-virnet-python') 
    print (' [*] Waiting for scenrio messages. To exit press CTRL+C') 


    def callback(ch, method, properties, body): 
     print (" Received %s" % (body)) 

    channel.basic_consume(callback, queue='savi-virnet-python', no_ack=True) 
    channel.start_consuming() 

def manager(): 
    connection1= pika.BlockingConnection(pika.ConnectionParameters 
    (host='localhost')) 
    channel1 = connection1.channel() 
    connection2= pika.BlockingConnection(pika.ConnectionParameters 
    (host='localhost')) 
    channel2 = connection2.channel() 
    t1 = threading.Thread(target=client_info, args=(channel1,)) 
    t1.daemon = True 
    threads.append(t1) 
    t1.start() 

    t2 = threading.Thread(target=scenario_info, args=(channel2,)) 
    t2.daemon = True 
    threads.append(t2) 


    t2.start() 
    for t in threads: 
    t.join() 


manager()