Odpowiedź na pytanie "jaki jest najlepszy sposób" zależy w dużym stopniu od sposobu użytkowania kolejek i tego, co rozumie się przez "najlepsze". Ponieważ nie mogę jeszcze komentować pytań, po prostu spróbuję zasugerować kilka możliwych rozwiązań.
W każdym przykładzie zamierzam założyć, że wymiana jest już zadeklarowana.
wątki
można spożywać wiadomości z dwóch kolejkach na oddzielnych hostów w jednym procesie z wykorzystaniem pika
.
Masz rację - jako its own FAQ states, pika
nie jest bezpieczna dla wątków, ale może być używana w trybie wielowątkowym, tworząc połączenia z hostami RabbitMQ na wątek. Dokonywanie ten przykład uruchomić w wątkach stosując moduł threading
wygląda następująco:
import pika
import threading
class ConsumerThread(threading.Thread):
def __init__(self, host, *args, **kwargs):
super(ConsumerThread, self).__init__(*args, **kwargs)
self._host = host
# Not necessarily a method.
def callback_func(self, channel, method, properties, body):
print("{} received '{}'".format(self.name, body))
def run(self):
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self._host,
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(self.callback_func,
result.method.queue,
no_ack=True)
channel.start_consuming()
if __name__ == "__main__":
threads = [ConsumerThread("host1"), ConsumerThread("host2")]
for thread in threads:
thread.start()
Mam deklarowaną callback_func
jako metoda czysto używać ConsumerThread.name
podczas drukowania treści wiadomości. Równie dobrze może to być funkcja spoza klasy ConsumerThread
.
Procesy
Alternatywnie, można zawsze uruchomić tylko jeden proces z kodem konsumentów za kolejce chcesz spożywać wydarzenia.
import pika
import sys
def callback_func(channel, method, properties, body):
print(body)
if __name__ == "__main__":
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=sys.argv[1],
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
channel.start_consuming()
a następnie uruchomić przez:
$ python single_consume.py host1
$ python single_consume.py host2 # e.g. on another console
Jeśli praca robisz na wiadomości z kolejek jest CPU-heavy i tak długo, jak liczba rdzeni w CPU> = liczba konsumentów, na ogół lepiej jest stosować to podejście - chyba że twoje koleje są puste przez większość czasu, a konsumenci nie wykorzystają tego czasu procesora *.
asynchroniczny
Inną alternatywą jest wiążą się asynchroniczne ramy (na przykład Twisted
) i całość działa w pojedynczej nici.
Nie można już używać BlockingConnection
w asynchronicznym kodzie; Na szczęście pika
posiada adapter do Twisted
:
from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log
class Consumer(object):
def on_connected(self, connection):
d = connection.channel()
d.addCallback(self.got_channel)
d.addCallback(self.queue_declared)
d.addCallback(self.queue_bound)
d.addCallback(self.handle_deliveries)
d.addErrback(log.err)
def got_channel(self, channel):
self.channel = channel
return self.channel.queue_declare(exclusive=True)
def queue_declared(self, queue):
self._queue_name = queue.method.queue
self.channel.queue_bind(queue=self._queue_name,
exchange="my-exchange",
routing_key="*.*.*.*.*")
def queue_bound(self, ignored):
return self.channel.basic_consume(queue=self._queue_name)
def handle_deliveries(self, queue_and_consumer_tag):
queue, consumer_tag = queue_and_consumer_tag
self.looping_call = task.LoopingCall(self.consume_from_queue, queue)
return self.looping_call.start(0)
def consume_from_queue(self, queue):
d = queue.get()
return d.addCallback(lambda result: self.handle_payload(*result))
def handle_payload(self, channel, method, properties, body):
print(body)
if __name__ == "__main__":
consumer1 = Consumer()
consumer2 = Consumer()
parameters = ConnectionParameters()
cc = protocol.ClientCreator(reactor,
TwistedProtocolConnection,
parameters)
d1 = cc.connectTCP("host1", 5672)
d1.addCallback(lambda protocol: protocol.ready)
d1.addCallback(consumer1.on_connected)
d1.addErrback(log.err)
d2 = cc.connectTCP("host2", 5672)
d2.addCallback(lambda protocol: protocol.ready)
d2.addCallback(consumer2.on_connected)
d2.addErrback(log.err)
reactor.run()
Takie podejście byłoby jeszcze lepiej, im więcej kolejek będzie można spożywać od a mniej CPU-bound praca wykonywania przez konsumentów jest *.
Python 3
Skoro już wspomnieliśmy pika
, mam ograniczone się do rozwiązań opartych na Pythona 2.x, ponieważ pika
nie jest jeszcze przeniesiony.
Ale gdybyś chciał przejść do> = 3.3, jedną z możliwych opcji jest użycie asyncio
z jednym z protokołów AMQP (protokół, którym rozmawiasz z RabbitMQ), np. asynqp
lub aioamqp
.
* - proszę zauważyć, że są to bardzo płytkie wskazówki - w większości przypadków wybór nie jest aż tak oczywisty; co będzie dla ciebie najlepsze, zależy od "nasycenia" kolejek (wiadomości/czasu), od tego, jaką pracę wykonujesz po otrzymaniu tych wiadomości, w jakim środowisku prowadzisz swoich klientów itp .; nie ma sposobu, aby mieć pewność, inne niż do benchmarku wszystkie implementacje
Dzięki, bardzo pomocne. – blindsnowmobile
Nie ma za co. :) Zwróciłem także uwagę na jeszcze jedną rzecz w edycji. – Unit03