2016-01-06 16 views
34

Próbuję użyć protokołu HTTP Akka do podstawowego uwierzytelnienia mojego żądania. Tak się składa, że ​​mam zewnętrzny zasób umożliwiający uwierzytelnienie, więc muszę wykonać reszę do tego zasobu.Akka HTTP: Blokowanie w przyszłości blokuje serwer

Zajmuje to trochę czasu i podczas przetwarzania wydaje się, że reszta mojego interfejsu API jest zablokowana, czekając na to połączenie. Mam powielana to z bardzo prostego przykładu:

// used dispatcher: 
implicit val system = ActorSystem() 
implicit val executor = system.dispatcher 
implicit val materializer = ActorMaterializer() 


val routes = 
    (post & entity(as[String])) { e => 
    complete { 
     Future{ 
     Thread.sleep(5000) 
     e 
     } 
    } 
    } ~ 
    (get & path(Segment)) { r => 
    complete { 
     "get" 
    } 
    } 

Gdybym dodawać do punktu końcowego dziennika, moja get końcowy jest również zatrzymany czeka na 5 sekund, których punktem końcowym dziennika dyktowanych.

Czy to oczekiwane zachowanie, a jeśli tak, to w jaki sposób mogę wykonywać operacje blokowania bez blokowania mojego całego interfejsu API?

Odpowiedz

118

oczekuje Co można zaobserwować zachowanie - ale oczywiście to jest bardzo złe. Dobrze, że znane są znane rozwiązania i najlepsze praktyki, aby się przed tym uchronić. W tej odpowiedzi chciałbym poświęcić trochę czasu, aby wyjaśnić kwestię krótko, długo, a następnie dogłębnie - cieszyć się czytaniem!

Krótka odpowiedź: „nie blokują infrastruktury routingu”, zawsze używać dedykowanego dla dyspozytora operacji blokowania!

Przyczyna zaobserwowanego objawu: Problem polega na tym, że używasz context.dispatcher jako dyspozytora, w którym wykonywane są blokady na kontraktach futures. Ten sam program rozsyłający (który w uproszczeniu jest po prostu "wiązką wątków") jest używany przez infrastrukturę routingu do obsługi otrzymywanych żądań - tak więc, jeśli zablokujesz wszystkie dostępne wątki, kończysz z głodu infrastrukturą routingu. (Rzeczą do debaty i analizy porównawczej jest to, że protokół Akka HTTP może ochronić przed tym, dodam to do mojej listy badań).

Blokowanie musi być traktowane ze szczególną ostrożnością, aby nie wpływać na innych użytkowników tego samego programu rozsyłającego (dlatego tak łatwo jest rozdzielić wykonanie na inne), jak wyjaśniono w sekcji Akka docs: Blocking needs careful management.

Coś jeszcze chciałem zwrócić uwagę jest to, że trzeba uniknąć blokowania API w ogóle, jeśli to możliwe - jeśli długa operacja bieganie nie jest tak naprawdę jedna operacja, ale jego serii, mogłeś oddzielić tych na inny aktorzy lub sekwencyjne futures. W każdym razie, chciałem tylko wskazać - jeśli to możliwe, unikać takich blokujących połączeń, ale jeśli musisz - poniższe wyjaśnienia wyjaśniają, jak poprawnie sobie z nimi poradzić.

Dogłębna analiza i rozwiązań:

Teraz, gdy wiemy, co jest złe, koncepcyjnie, rzućmy okiem, co dokładnie jest uszkodzony w powyższym kodzie i jak dobrym rozwiązaniem tego problemu wygląda :

Kolor stan = gwint:

  • turkusowy - SYPIALNA
  • pomarańczowy - oczekiwanie
  • zielony - RUNNABLE

Zbadajmy teraz 3 fragmenty kodu i ich wpływ na dyspozytorów oraz wydajność aplikacji. Aby wymusić to zachowanie aplikacja została umieścić pod następującym obciążeniu:

  • [a] utrzymać zainteresowanie żądania GET (patrz wyżej kodu w początkowym pytaniu o to), to nie blokuje tam
  • [b], a następnie po póki ogień 2000 żądania POST, co spowoduje zablokowanie 5second przed powrotem przyszłego

1) [bad] Dispatcher zachowanie na zły kod:

// BAD! (due to the blocking in Future): 
implicit val defaultDispatcher = system.dispatcher 

val routes: Route = post { 
    complete { 
    Future { // uses defaultDispatcher 
     Thread.sleep(5000)     // will block on the default dispatcher, 
     System.currentTimeMillis().toString // starving the routing infra 
    } 
    } 
} 

Wystawiamy więc naszą aplikację do [a] obciążenia, a już widać kilka wątków akka.actor.default-dispatcher - obsługują one żądania - mały zielony fragment i pomarańczowy, co oznacza, że ​​pozostałe są bezczynne tam.

blocking is killing the default dispatcher

Następnie rozpoczynamy [b] obciążenia, co powoduje blokowanie tych wątków - można zobaczyć wczesny wątek „default-dyspozytor-2,3,4” Weszli do blokowania po bezczynności wcześniej. Obserwujemy również, że pula rośnie - nowe wątki są uruchamiane "default-dispatcher-18,19,20,21 ...", ale od razu idą spać (!) - marnujemy cenny zasób tutaj!

Liczba takich uruchomionych wątków zależy od domyślnej konfiguracji modułu rozsyłającego, ale prawdopodobnie nie przekroczy ona 50 lub więcej. Ponieważ właśnie wystrzeliliśmy 2k blokujących ops, zagłodzimy cały wątek - operacje blokujące dominują tak, że infra routingu nie ma dostępnego wątku do obsługi innych żądań - bardzo źle!

Zróbmy coś z tym zrobić (co jest najlepszą praktyką Akka btw - zawsze izolować zachowanie blokowania jak pokazano poniżej):

2) [good!] Dyspozytor zachowania dobrych strukturyzowane/kod dyspozytorów:

W swojej application.conf configure to dyspozytor dedykowany do blokowania:

my-blocking-dispatcher { 
    type = Dispatcher 
    executor = "thread-pool-executor" 
    thread-pool-executor { 
    // in Akka previous to 2.4.2: 
    core-pool-size-min = 16 
    core-pool-size-max = 16 
    max-pool-size-min = 16 
    max-pool-size-max = 16 
    // or in Akka 2.4.2+ 
    fixed-pool-size = 16 
    } 
    throughput = 100 
} 

Powinieneś przeczytać więcej wDokumentacja, aby zrozumieć różne opcje tutaj. Główną kwestią jest to, że wybraliśmy ThreadPoolExecutor, który ma twardy limit wątków, które są dostępne dla operacji blokujących. Ustawienia rozmiaru zależą od tego, co robi twoja aplikacja i od liczby rdzeni twojego serwera.

Następnie musimy go używać, zamiast domyślnego:

// GOOD (due to the blocking in Future): 
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher") 

val routes: Route = post { 
    complete { 
    Future { // uses the good "blocking dispatcher" that we configured, 
      // instead of the default dispatcher – the blocking is isolated. 
     Thread.sleep(5000) 
     System.currentTimeMillis().toString 
    } 
    } 
} 

Mamy ciśnienie aplikacji przy użyciu tego samego obciążenia, najpierw trochę normalnych wniosków, a następnie dodamy te blokujące. To, w jaki sposób ThreadPools będzie zachowywać się w tym przypadku:

the blocking pool scales to our needs

Więc początkowo normalne żądania są łatwo obsługiwane przez domyślną dyspozytora, można zobaczyć kilka zielone linie tam - to rzeczywiste wykonanie (nie jestem bardzo obciążając serwer, więc jest to głównie bezczynne).

Teraz, gdy zaczynamy wydawać blokujące operacje, my-blocking-dispatcher-* rozpoczyna się i uruchamia się do liczby skonfigurowanych wątków. Obsługuje tam wszystkie Śpiące. Ponadto, po pewnym czasie nic nie dzieje się na tych wątkach, zamyka je. Gdybyśmy trafili na serwer z inną grupą blokującą pulę, uruchomiliby nowe wątki, które zajmą się snem() - ale w międzyczasie - nie marnujemy naszych cennych wątków na "po prostu zostańmy tam i nic nie robić".

Podczas korzystania z tej konfiguracji nie wpłynęło to na przepustowość normalnych żądań GET, ale nadal były one z powodzeniem obsługiwane przez (wciąż całkiem darmowy) domyślny program rozsyłający.

Jest to zalecany sposób radzenia sobie z każdym rodzajem blokowania w aplikacjach reaktywnych. Często określa się je jako "masowe" (lub "izolowanie") złe zachowanie aplikacji, w tym przypadku złe zachowanie polega na zasypianiu/blokowaniu.

3) [workaround-ish] Dyspozytor zachowanie podczas blocking stosowane prawidłowo:

W tym przykładzie używamy metody scaladoc for scala.concurrent.blocking które mogą pomóc w obliczu blokujących ops. Generalnie powoduje, że więcej wątków jest odwirowywanych, aby przetrwać operacje blokujące.

// OK, default dispatcher but we'll use `blocking` 
implicit val dispatcher = system.dispatcher 

val routes: Route = post { 
    complete { 
    Future { // uses the default dispatcher (it's a Fork-Join Pool) 
     blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
       // but at the cost of exploding the number of threads (which eventually 
       // may also lead to starvation problems, but on a different layer) 
     Thread.sleep(5000) 
     System.currentTimeMillis().toString 
     } 
    } 
    } 
} 

Aplikacja będzie zachowywać się tak:

blocking causes more threads to be started

Zauważysz, że LOT nowych wątków są tworzone, to dlatego blokuje podpowiedzi w „Och, to będziesz być blokowane, więc potrzebujemy więcej wątków ". To powoduje, że całkowity czas, w którym jesteśmy zablokowani, jest mniejszy niż w przykładzie 1, jednak wtedy setki wątków nic nie robią po zakończeniu operacji blokowania ... Oczywiście, w końcu zostaną zamknięte (FJP robi to), ale przez pewien czas będziemy mieć dużą (niekontrolowaną) ilość wątków, w przeciwieństwie do 2), gdzie dokładnie wiemy, ile wątków dedykujemy dla zachowań blokujących.

Reasumując: Nigdy nie blokuje domyślnego dyspozytora :-)

Najlepszą praktyką jest stosowanie wzoru przedstawionego w 2), aby mieć dyspozytora dla operacji blokujących dostępnych i wykonać je tam.

Mam nadzieję, że to pomoże, happy hakking!

Omówiono wersja HTTP Akka: 2.0.1

Profiler używane: Wiele osób pytało mnie w odpowiedzi na to odpowiedź prywatnie co profiler Kiedyś wizualizację stanów wątków w powyższych pics, więc dodanie tej informacji tutaj: Użyłem YourKit, który jest niesamowitym profilerem komercyjnym (darmowym dla OSS), ale możesz osiągnąć te same wyniki za pomocą bezpłatnego VisualVM from OpenJDK.

+1

Zawarliśmy tę odpowiedź w ramach oficjalnej dokumentacji: http://doc.akka.io/docs/akka/2.4/scala/http/handling-blocking-operations-in-akka-http-routes.html#handling -blocking-in-http-routes-scala –

+0

Powyższy link jest uszkodzony. –

+0

Co jeśli chcę zwrócić odpowiedź i pracować dalej w tle? [This] (https://gist.github.com/asarkar/37e4cb026c463f6334617e923cfc4b12) wydaje się działać. –

3

Dziwne, ale dla mnie wszystko działa dobrze (bez blokowania). Oto kod:

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.server.Directives._ 
import akka.http.scaladsl.server.Route 
import akka.stream.ActorMaterializer 

import scala.concurrent.Future 


object Main { 

    implicit val system = ActorSystem() 
    implicit val executor = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    val routes: Route = (post & entity(as[String])) { e => 
    complete { 
     Future { 
     Thread.sleep(5000) 
     e 
     } 
    } 
    } ~ 
    (get & path(Segment)) { r => 
     complete { 
     "get" 
     } 
    } 

    def main(args: Array[String]) { 

    Http().bindAndHandle(routes, "0.0.0.0", 9000).onFailure { 
     case e => 
     system.shutdown() 
    } 
    } 
} 

Ponadto można owinąć cię asynchroniczny kod do onComplete lub onSuccess dyrektywy:

onComplete(Future{Thread.sleep(5000)}){e} 

onSuccess(Future{Thread.sleep(5000)}){complete(e)} 
+0

Tak, to samo tutaj. Właśnie przetestowałem to z akka-http 2.0.1 – expert

+0

również możesz spróbować opakować Future w dyrektywę onComplete/onSuccess. –