Mam instalatora, który używa Tornado
jako serwer http i niestandardowe ramy http. Pomysł polega na tym, aby mieć jednego handlarza tornadami, a każdy otrzymany wniosek powinien zostać przesłany pod numer ThreadPoolExecutor
i pozostawić Tornado
, aby móc wysłuchać nowych wniosków. Gdy wątek zakończy przetwarzanie żądania, wywoływane jest wywołanie zwrotne, które wysyła odpowiedź do klienta w tym samym wątku, w którym wykonywana jest pętla IO.Tornado z ThreadPoolExecutor
Pozbawiony, kod wygląda mniej więcej tak. Baza klasy serwer http:
class HttpServer():
def __init__(self, router, port, max_workers):
self.router = router
self.port = port
self.max_workers = max_workers
def run(self):
raise NotImplementedError()
Tornado backed realizacja HTTPServer:
class TornadoServer(HttpServer):
def run(self):
executor = futures.ThreadPoolExecutor(max_workers=self.max_workers)
def submit(callback, **kwargs):
future = executor.submit(Request(**kwargs))
future.add_done_callback(callback)
return future
application = web.Application([
(r'(.*)', MainHandler, {
'submit': submit,
'router': self.router
})
])
application.listen(self.port)
ioloop.IOLoop.instance().start()
Główny treser, który obsługuje wszystkie żądania tornado (realizowane tylko GET, ale inny byłby taki sam):
class MainHandler():
def initialize(self, submit, router):
self.submit = submit
self.router = router
def worker(self, request):
responder, kwargs = self.router.resolve(request)
response = responder(**kwargs)
return res
def on_response(self, response):
# when this is called response should already have result
if isinstance(response, Future):
response = response.result()
# response is my own class, just write returned content to client
self.write(response.data)
self.flush()
self.finish()
def _on_response_ready(self, response):
# schedule response processing in ioloop, to be on ioloop thread
ioloop.IOLoop.current().add_callback(
partial(self.on_response, response)
)
@web.asynchronous
def get(self, url):
self.submit(
self._on_response_ready, # callback
url=url, method='post', original_request=self.request
)
Serwer jest uruchamiany z czymś podobnym:
router = Router()
server = TornadoServer(router, 1111, max_workers=50)
server.run()
Tak więc, jak widać, główny handler przesyła wszystkie żądania do puli wątków i po zakończeniu przetwarzania wywoływane jest wywołanie zwrotne (_on_response_ready
), które właśnie planuje zakończenie żądania w pętli IO (aby upewnić się, że jest to zrobione na tym samym wątku, w którym wykonywana jest pętla IO).
To działa. Przynajmniej tak to wygląda.
Moim problemem tutaj jest wydajność w odniesieniu do max pracowników w ThreadPoolExecutor.
Wszystkie procedury obsługi są powiązane z IO, nie ma żadnych obliczeń (najczęściej czekają one na DB lub usługi zewnętrzne), więc przy 50 pracownikach oczekiwałbym, że 50 wniosków o współdziałanie zakończyłoby się około 50 razy szybciej niż 50 żądań ze stawkami tylko jeden pracownik.
Ale tak nie jest. Widzę niemal identyczne żądania na sekundę, gdy mam 50 pracowników w puli wątków i 1 pracownika.
pomiarowych, Użyłem Apache-Bench z czymś takim:
ab -n 100 -c 10 http://localhost:1111/some_url
Czy ktoś ma pomysł co robię źle? Czy źle zrozumiałem, jak działa Tornado lub ThreadPool? Lub połączenie?
Ten kod wygląda mniej lub bardziej poprawnie. Jak dokładnie 50 pracowników wykonuje swoje operacje wejścia/wyjścia? Ile widzisz QPS? Czy przetwarzanie HTTP w wątku IOLoop może być wąskim gardłem? –
@BenDarnell, I/O w robotach przeważnie wysyła zapytanie do bazy danych i wywołuje usługi zewnętrzne. Przetwarzanie HTTP w IOLoop jest dość minimalne (pomiar pokazuje, że zajmuje dużo mniej czasu niż rzeczywiste przetwarzanie (1 lub 2 procenty) .Jeśli QPS oznacza Zapytania na sekundę, gdzie Zapytanie jest zapytaniem DB, to jedno lub dwa zapytania są wykonywane na żądanie HTTP. –
Jakie biblioteki używasz do zapytań do bazy danych Czy może być biblioteka C, która nie zwalnia GIL podczas blokowania na I/O? –