2017-03-25 42 views
5

odnosząc się do poniższego realizacji wymienionych w:Czy łączenie połączeń w akka-http przy użyciu wątku źródłowego implementacji jest bezpieczne?

http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html

val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io") 
val queue = 
    Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew) 
    .via(poolClientFlow) 
    .toMat(Sink.foreach({ 
     case ((Success(resp), p)) => p.success(resp) 
     case ((Failure(e), p)) => p.failure(e) 
    }))(Keep.left) 
    .run() 

Czy wątku bezpieczne zaoferować kolejki żądań HTTP z wielu wątków? Jeśli tak nie jest, jaki jest najlepszy sposób na wdrożenie takiego wymogu? może używając dedykowanego aktora?

Odpowiedz

1

Nie, to nie jest wątku bezpieczne, jak na api doc: SourceQueue that current source is materialized to is for single thread usage only.

Dedykowany aktor będzie działać dobrze, ale jeśli można, stosując Source.actorRef (doc link) zamiast Source.queue byłoby łatwiejsze.

Zasadniczo wadą modelu Source.actorRef jest brak przeciwciśnienia, ale w przypadku korzystania z urządzenia OverflowStrategy.dropNew jasne jest, że nie oczekuje się przeciwciśnienia. Jako takie, można uzyskać to samo zachowanie przy użyciu Source.actorRef.

+0

Dziękuję bardzo za komentarz. Mam wymóg sygnalizowania jakiegoś błędu w przypadku przepełnienia bufora, np. Return Future.failed (BufferFlowException), który według mojego zrozumienia nie może zostać zaimplementowany przy użyciu Source.actorRef. Kolejka źródłowa pasuje do opisu za pomocą interfejsu API QueueOfferResult. –

+0

@Nik Nadal uważam, że odpowiedziałem prawidłowo na oryginalne pytanie. Jeśli chodzi o twoje nowe wymagania, wtedy naprawdę zaimplementowałbym aktora specjalizującego się w obsłudze kolejki. Musiałby dodać do kolejki za pomocą 'queue.offer (???) pipeTo self', a następnie byłby w stanie zareagować na niepowodzenie, posługując się różnymi podtypami' QueueOfferResult' w swojej metodzie 'receive'. –

+0

Aby zapobiec przepełnieniu skrzynki pocztowej aktora, możesz użyć 'NonBlockingBoundedMailbox' (http://doc.akka.io/docs/akka/current/scala/mailboxes.html). – khiramatsu

1

Jak poprawnie podano przez @ frederic-a, SourceQueue nie jest rozwiązaniem bezpiecznym dla wątków.

Być może pasującym rozwiązaniem byłoby użycie MergeHub (więcej szczegółów można znaleźć w artykule docs). Pozwala to skutecznie uruchomić wykres w dwóch etapach.

  1. z koncentratora do zlewu (ta materializuje do zlewu)
  2. rozpowszechniać zlew zmaterializowane w pkt 1 do użytkowników. Sink s są rzeczywiście zaprojektowane do dystrybucji, więc jest to całkowicie bezpieczne.

Takie rozwiązanie byłoby bezpieczne ciśnienie wsteczne mądry, jak na MergeHub zachowania

Jeżeli konsument nie może nadążyć następnie wszystkich producentów są backpressured.

przykład kodu poniżej:

val reqSink: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = 
    MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16) 
    .via(poolClientFlow) 
    .toMat(Sink.foreach({ 
    case ((Success(resp), p)) => p.success(resp) 
    case ((Failure(e), p)) => p.failure(e) 
    }))(Keep.left) 
    .run() 

// on the user threads 

val source: Source[(HttpRequest, Promise[HttpResponse]), NotUsed] = ??? 
source.runWith(reqSink)