2013-02-28 11 views
11

Próbuję znaleźć asynchronicznie korzystać z Redis i Tornado. Znalazłem tornado-redis, ale potrzebuję czegoś więcej niż tylko dodać kod yield.Jak mogę asynchronicznie używać Tornado i Redis?

Mam następujący kod:

import redis 
import tornado.web 

class WaiterHandler(tornado.web.RequestHandler): 

    @tornado.web.asynchronous 
    def get(self): 
     client = redis.StrictRedis(port=6279) 
     pubsub = client.pubsub() 
     pubsub.subscribe('test_channel') 

     for item in pubsub.listen(): 
      if item['type'] == 'message': 
       print item['channel'] 
       print item['data'] 

     self.write(item['data']) 
     self.finish() 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/wait", WaiterHandler), 
]) 

if __name__ == '__main__': 
    application.listen(8888) 
    print 'running' 
    tornado.ioloop.IOLoop.instance().start() 

Potrzebuję uzyskanie dostępu do / URL i uzyskać "Hello World", a nie wniosek czeka w /wait. Jak mogę to zrobić?

+1

Redis pub/sub nie powinien być stosowany w 'web.RequestHandler', ponieważ będzie blokować ioloop czekając na' PubSub .listen() '. Zajrzyj do http://tornadogists.org/532067/ dla przykładu działania websocket. –

+0

Websocket jest dobrym rozwiązaniem, jednak moja aplikacja wymaga pracy w przeglądarkach, które nie obsługują websockets. Używam długiego głosowania. Z tego powodu potrzebuję "async get". –

+0

@HelieelsonSantos w takim przypadku najlepiej jest zachować lokalny stan historii zasubskrybowanego kanału (podawanego przez osobny wątek), a następnie zapisać ten stan natychmiast do odpowiedzi i wykonać operację 'get'. Klient powinien utrzymywać rekordy ostatnio uzyskanego indeksu lub ostatniego czasu uzyskania itp., Co pozwala zachować ciągłość dla różnych klientów. Napiszę odpowiedź z przykładem za kilka godzin, kiedy znajdę czas. –

Odpowiedz

5

Nie należy używać Redis pub/sub w głównym wątku Tornado, ponieważ spowoduje to zablokowanie pętli IO. Możesz obsłużyć długi polling z klientów sieciowych w głównym wątku, ale powinieneś utworzyć osobny wątek do odsłuchu Redis. Następnie można używać ioloop.add_callback() i/lub threading.Queue do komunikowania się z głównym wątkiem podczas odbierania wiadomości.

1

Ok, więc tutaj jest mój przykład, jak byłoby to zrobić z żądań GET.

Dodałem dwa główne komponenty:

Pierwszy jest prosty gwintowany PubSub słuchacz, który dodaje nowe wiadomości do lokalnej listy obiektów. Dodałem także listę akcesorów do klasy, dzięki czemu możesz czytać z wątku odsłuchiwania tak, jakbyś czytał ze zwykłej listy. Jeśli chodzi o twój numer WebRequest, właśnie czytasz dane z lokalnego obiektu listy. Ta funkcja zwraca natychmiast i nie blokuje akceptowania i przetwarzania bieżących żądań dotyczących wypełniania lub przyszłych żądań.

class OpenChannel(threading.Thread): 
    def __init__(self, channel, host = None, port = None): 
     threading.Thread.__init__(self) 
     self.lock = threading.Lock() 
     self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379) 
     self.pubsub = self.redis.pubsub() 
     self.pubsub.subscribe(channel) 

     self.output = [] 

    # lets implement basic getter methods on self.output, so you can access it like a regular list 
    def __getitem__(self, item): 
     with self.lock: 
      return self.output[item] 

    def __getslice__(self, start, stop = None, step = None): 
     with self.lock: 
      return self.output[start:stop:step] 

    def __str__(self): 
     with self.lock: 
      return self.output.__str__() 

    # thread loop 
    def run(self): 
     for message in self.pubsub.listen(): 
      with self.lock: 
       self.output.append(message['data']) 

    def stop(self): 
     self._Thread__stop() 

Druga to klasa ApplicationMixin. Jest to obiekt wtórny, w którym dziedziczy się klasa żądań stron w celu dodania funkcjonalności i atrybutów. W tym przypadku sprawdza, czy detektor kanałów już istnieje dla żądanego kanału, tworzy jedną, jeśli żadna nie została znaleziona, i zwraca uchwyt odbiornika do WebRequest.

# add a method to the application that will return existing channels 
# or create non-existing ones and then return them 
class ApplicationMixin(object): 
    def GetChannel(self, channel, host = None, port = None): 
     if channel not in self.application.channels: 
      self.application.channels[channel] = OpenChannel(channel, host, port) 
      self.application.channels[channel].start() 
     return self.application.channels[channel] 

WebRequest klasa teraz traktuje słuchacza tak, jakby to była lista statyczne (pamiętając, że trzeba dać self.write ciąg)

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin): 
    @tornado.web.asynchronous 
    def get(self, channel): 
     # get the channel 
     channel = self.GetChannel(channel) 
     # write out its entire contents as a list 
     self.write('{}'.format(channel[:])) 
     self.finish() # not necessary? 

Wreszcie, po aplikacji jest tworzony, dodałem pusta słowniku jako atrybut

# add a dictionary containing channels to your application 
application.channels = {} 

jak również niektóre porządki z uruchomionych wątków, po wyjściu z aplikacji

# clean up the subscribed channels 
for channel in application.channels: 
    application.channels[channel].stop() 
    application.channels[channel].join() 

Kompletny kod:

import threading 
import redis 
import tornado.web 



class OpenChannel(threading.Thread): 
    def __init__(self, channel, host = None, port = None): 
     threading.Thread.__init__(self) 
     self.lock = threading.Lock() 
     self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379) 
     self.pubsub = self.redis.pubsub() 
     self.pubsub.subscribe(channel) 

     self.output = [] 

    # lets implement basic getter methods on self.output, so you can access it like a regular list 
    def __getitem__(self, item): 
     with self.lock: 
      return self.output[item] 

    def __getslice__(self, start, stop = None, step = None): 
     with self.lock: 
      return self.output[start:stop:step] 

    def __str__(self): 
     with self.lock: 
      return self.output.__str__() 

    # thread loop 
    def run(self): 
     for message in self.pubsub.listen(): 
      with self.lock: 
       self.output.append(message['data']) 

    def stop(self): 
     self._Thread__stop() 


# add a method to the application that will return existing channels 
# or create non-existing ones and then return them 
class ApplicationMixin(object): 
    def GetChannel(self, channel, host = None, port = None): 
     if channel not in self.application.channels: 
      self.application.channels[channel] = OpenChannel(channel, host, port) 
      self.application.channels[channel].start() 
     return self.application.channels[channel] 

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin): 
    @tornado.web.asynchronous 
    def get(self, channel): 
     # get the channel 
     channel = self.GetChannel(channel) 
     # write out its entire contents as a list 
     self.write('{}'.format(channel[:])) 
     self.finish() # not necessary? 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/channel/(?P<channel>\S+)", ReadChannel), 
]) 


# add a dictionary containing channels to your application 
application.channels = {} 


if __name__ == '__main__': 
    application.listen(8888) 
    print 'running' 
    try: 
     tornado.ioloop.IOLoop.instance().start() 
    except KeyboardInterrupt: 
     pass 

    # clean up the subscribed channels 
    for channel in application.channels: 
     application.channels[channel].stop() 
     application.channels[channel].join() 
+0

Możesz łatwo zastąpić listę kolejką lub innym obiektem obsługującym nieblokujący dostęp i zwracać tylko wiadomości odebrane od poprzedniego żądania. Trzeba jednak utrzymywać kolejkę dla każdego klienta i upewnić się, że nie blokuje się i poprawnie obsługuje wyjątki 'Empty'. –

2

Pythona> = 3.3, radzę użyć aioredis. nie przetestować kod poniżej, ale powinno to być coś takiego:

import redis 
import tornado.web 
from tornado.web import RequestHandler 

import aioredis 
import asyncio 
from aioredis.pubsub import Receiver 


class WaiterHandler(tornado.web.RequestHandler): 

    @tornado.web.asynchronous 
    def get(self): 
     client = await aioredis.create_redis((host, 6279), encoding="utf-8", loop=IOLoop.instance().asyncio_loop) 

     ch = redis.channels['test_channel'] 
     result = None 
     while await ch.wait_message(): 
      item = await ch.get() 
      if item['type'] == 'message': 
       print item['channel'] 
       print item['data'] 
       result = item['data'] 

     self.write(result) 
     self.finish() 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/wait", WaiterHandler), 
]) 

if __name__ == '__main__': 
    print 'running' 
    tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop') 
    server = tornado.httpserver.HTTPServer(application) 
    server.bind(8888) 
    # zero means creating as many processes as there are cores. 
    server.start(0) 
    tornado.ioloop.IOLoop.instance().start()