2015-08-25 30 views
12

Mam instalatora, który używa Tornado jako serwer http i niestandardowe ramy http. Pomysł polega na tym, aby mieć jednego handlarza tornadami, a każdy otrzymany wniosek powinien zostać przesłany pod numer ThreadPoolExecutor i pozostawić Tornado, aby móc wysłuchać nowych wniosków. Gdy wątek zakończy przetwarzanie żądania, wywoływane jest wywołanie zwrotne, które wysyła odpowiedź do klienta w tym samym wątku, w którym wykonywana jest pętla IO.Tornado z ThreadPoolExecutor

Pozbawiony, kod wygląda mniej więcej tak. Baza klasy serwer http:

class HttpServer(): 
    def __init__(self, router, port, max_workers): 
     self.router = router 
     self.port = port 
     self.max_workers = max_workers 

    def run(self): 
     raise NotImplementedError() 

Tornado backed realizacja HTTPServer:

class TornadoServer(HttpServer): 
    def run(self): 
     executor = futures.ThreadPoolExecutor(max_workers=self.max_workers) 

     def submit(callback, **kwargs): 
      future = executor.submit(Request(**kwargs)) 
      future.add_done_callback(callback) 
      return future 

     application = web.Application([ 
      (r'(.*)', MainHandler, { 
       'submit': submit, 
       'router': self.router 
      }) 
     ]) 

     application.listen(self.port) 

     ioloop.IOLoop.instance().start() 

Główny treser, który obsługuje wszystkie żądania tornado (realizowane tylko GET, ale inny byłby taki sam):

class MainHandler(): 
    def initialize(self, submit, router): 
     self.submit = submit 
     self.router = router 

    def worker(self, request): 
     responder, kwargs = self.router.resolve(request) 
     response = responder(**kwargs) 
     return res 

    def on_response(self, response): 
     # when this is called response should already have result 
     if isinstance(response, Future): 
      response = response.result() 
     # response is my own class, just write returned content to client 
     self.write(response.data) 
     self.flush() 
     self.finish() 

    def _on_response_ready(self, response): 
     # schedule response processing in ioloop, to be on ioloop thread 
     ioloop.IOLoop.current().add_callback(
      partial(self.on_response, response) 
     ) 

    @web.asynchronous 
    def get(self, url): 
     self.submit(
      self._on_response_ready, # callback 
      url=url, method='post', original_request=self.request 
     ) 

Serwer jest uruchamiany z czymś podobnym:

router = Router() 
server = TornadoServer(router, 1111, max_workers=50) 
server.run() 

Tak więc, jak widać, główny handler przesyła wszystkie żądania do puli wątków i po zakończeniu przetwarzania wywoływane jest wywołanie zwrotne (_on_response_ready), które właśnie planuje zakończenie żądania w pętli IO (aby upewnić się, że jest to zrobione na tym samym wątku, w którym wykonywana jest pętla IO).

To działa. Przynajmniej tak to wygląda.

Moim problemem tutaj jest wydajność w odniesieniu do max pracowników w ThreadPoolExecutor.

Wszystkie procedury obsługi są powiązane z IO, nie ma żadnych obliczeń (najczęściej czekają one na DB lub usługi zewnętrzne), więc przy 50 pracownikach oczekiwałbym, że 50 wniosków o współdziałanie zakończyłoby się około 50 razy szybciej niż 50 żądań ze stawkami tylko jeden pracownik.

Ale tak nie jest. Widzę niemal identyczne żądania na sekundę, gdy mam 50 pracowników w puli wątków i 1 pracownika.

pomiarowych, Użyłem Apache-Bench z czymś takim:

ab -n 100 -c 10 http://localhost:1111/some_url 

Czy ktoś ma pomysł co robię źle? Czy źle zrozumiałem, jak działa Tornado lub ThreadPool? Lub połączenie?

+0

Ten kod wygląda mniej lub bardziej poprawnie. Jak dokładnie 50 pracowników wykonuje swoje operacje wejścia/wyjścia? Ile widzisz QPS? Czy przetwarzanie HTTP w wątku IOLoop może być wąskim gardłem? –

+0

@BenDarnell, I/O w robotach przeważnie wysyła zapytanie do bazy danych i wywołuje usługi zewnętrzne. Przetwarzanie HTTP w IOLoop jest dość minimalne (pomiar pokazuje, że zajmuje dużo mniej czasu niż rzeczywiste przetwarzanie (1 lub 2 procenty) .Jeśli QPS oznacza Zapytania na sekundę, gdzie Zapytanie jest zapytaniem DB, to jedno lub dwa zapytania są wykonywane na żądanie HTTP. –

+0

Jakie biblioteki używasz do zapytań do bazy danych Czy może być biblioteka C, która nie zwalnia GIL podczas blokowania na I/O? –

Odpowiedz

0

Opakowanie momoko dla postgres rozwiązuje ten problem, zgodnie z sugestią kwarunek. Jeśli chcesz poprosić o dalsze porady dotyczące debugowania od zewnętrznych współpracowników, pomoże to opublikować dzienniki debugowania znacznika czasu z zadania testowego, które wykonuje tryb uśpienia (10) przed każdym dostępem do bazy danych.