Ok, więc tutaj jest mój przykład, jak byłoby to zrobić z żądań GET.
Dodałem dwa główne komponenty:
Pierwszy jest prosty gwintowany PubSub słuchacz, który dodaje nowe wiadomości do lokalnej listy obiektów. Dodałem także listę akcesorów do klasy, dzięki czemu możesz czytać z wątku odsłuchiwania tak, jakbyś czytał ze zwykłej listy. Jeśli chodzi o twój numer WebRequest
, właśnie czytasz dane z lokalnego obiektu listy. Ta funkcja zwraca natychmiast i nie blokuje akceptowania i przetwarzania bieżących żądań dotyczących wypełniania lub przyszłych żądań.
class OpenChannel(threading.Thread):
def __init__(self, channel, host = None, port = None):
threading.Thread.__init__(self)
self.lock = threading.Lock()
self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
self.pubsub = self.redis.pubsub()
self.pubsub.subscribe(channel)
self.output = []
# lets implement basic getter methods on self.output, so you can access it like a regular list
def __getitem__(self, item):
with self.lock:
return self.output[item]
def __getslice__(self, start, stop = None, step = None):
with self.lock:
return self.output[start:stop:step]
def __str__(self):
with self.lock:
return self.output.__str__()
# thread loop
def run(self):
for message in self.pubsub.listen():
with self.lock:
self.output.append(message['data'])
def stop(self):
self._Thread__stop()
Druga to klasa ApplicationMixin. Jest to obiekt wtórny, w którym dziedziczy się klasa żądań stron w celu dodania funkcjonalności i atrybutów. W tym przypadku sprawdza, czy detektor kanałów już istnieje dla żądanego kanału, tworzy jedną, jeśli żadna nie została znaleziona, i zwraca uchwyt odbiornika do WebRequest.
# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
def GetChannel(self, channel, host = None, port = None):
if channel not in self.application.channels:
self.application.channels[channel] = OpenChannel(channel, host, port)
self.application.channels[channel].start()
return self.application.channels[channel]
WebRequest klasa teraz traktuje słuchacza tak, jakby to była lista statyczne (pamiętając, że trzeba dać self.write
ciąg)
class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
@tornado.web.asynchronous
def get(self, channel):
# get the channel
channel = self.GetChannel(channel)
# write out its entire contents as a list
self.write('{}'.format(channel[:]))
self.finish() # not necessary?
Wreszcie, po aplikacji jest tworzony, dodałem pusta słowniku jako atrybut
# add a dictionary containing channels to your application
application.channels = {}
jak również niektóre porządki z uruchomionych wątków, po wyjściu z aplikacji
# clean up the subscribed channels
for channel in application.channels:
application.channels[channel].stop()
application.channels[channel].join()
Kompletny kod:
import threading
import redis
import tornado.web
class OpenChannel(threading.Thread):
def __init__(self, channel, host = None, port = None):
threading.Thread.__init__(self)
self.lock = threading.Lock()
self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
self.pubsub = self.redis.pubsub()
self.pubsub.subscribe(channel)
self.output = []
# lets implement basic getter methods on self.output, so you can access it like a regular list
def __getitem__(self, item):
with self.lock:
return self.output[item]
def __getslice__(self, start, stop = None, step = None):
with self.lock:
return self.output[start:stop:step]
def __str__(self):
with self.lock:
return self.output.__str__()
# thread loop
def run(self):
for message in self.pubsub.listen():
with self.lock:
self.output.append(message['data'])
def stop(self):
self._Thread__stop()
# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
def GetChannel(self, channel, host = None, port = None):
if channel not in self.application.channels:
self.application.channels[channel] = OpenChannel(channel, host, port)
self.application.channels[channel].start()
return self.application.channels[channel]
class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
@tornado.web.asynchronous
def get(self, channel):
# get the channel
channel = self.GetChannel(channel)
# write out its entire contents as a list
self.write('{}'.format(channel[:]))
self.finish() # not necessary?
class GetHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello world")
application = tornado.web.Application([
(r"/", GetHandler),
(r"/channel/(?P<channel>\S+)", ReadChannel),
])
# add a dictionary containing channels to your application
application.channels = {}
if __name__ == '__main__':
application.listen(8888)
print 'running'
try:
tornado.ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
pass
# clean up the subscribed channels
for channel in application.channels:
application.channels[channel].stop()
application.channels[channel].join()
Redis pub/sub nie powinien być stosowany w 'web.RequestHandler', ponieważ będzie blokować ioloop czekając na' PubSub .listen() '. Zajrzyj do http://tornadogists.org/532067/ dla przykładu działania websocket. –
Websocket jest dobrym rozwiązaniem, jednak moja aplikacja wymaga pracy w przeglądarkach, które nie obsługują websockets. Używam długiego głosowania. Z tego powodu potrzebuję "async get". –
@HelieelsonSantos w takim przypadku najlepiej jest zachować lokalny stan historii zasubskrybowanego kanału (podawanego przez osobny wątek), a następnie zapisać ten stan natychmiast do odpowiedzi i wykonać operację 'get'. Klient powinien utrzymywać rekordy ostatnio uzyskanego indeksu lub ostatniego czasu uzyskania itp., Co pozwala zachować ciągłość dla różnych klientów. Napiszę odpowiedź z przykładem za kilka godzin, kiedy znajdę czas. –