2015-12-15 13 views
11

Próbuję napisać narzędzie do przesyłania danych wsadowych za pomocą Akka HTTP 2.0-M2. Ale ja obliczu akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error.Jak poprawnie połączyć się z klientem sieciowym Akka dla wielu żądań (10k - 100k)?

Starałem się wyizolować problem i tutaj jest przykładowy kod, który nie powiedzie się również:

public class TestMaxRequests { 
    private static final class Router extends HttpApp { 
     @Override 
     public Route createRoute() { 
      return route(
        path("test").route(
          get(handleWith(ctx -> ctx.complete("OK"))) 
        ) 
      ); 
     } 
    } 


    public static void main(String[] args) { 
     ActorSystem actorSystem = ActorSystem.create(); 
     Materializer materializer = ActorMaterializer.create(actorSystem); 

     Router router = new Router(); 
     router.bindRoute("127.0.0.1", 8082, actorSystem); 

     LoggingAdapter log = Logging.getLogger(actorSystem, new Object()); 

     for (int i = 0; i < 100; i++) { 
      final int reqNum = i; 
      Http.get(actorSystem).singleRequest(HttpRequest.create().withUri("http://127.0.0.1:8082/test"), materializer) 
        .onComplete(new OnComplete<HttpResponse>() { 
         @Override 
         public void onComplete(Throwable failure, HttpResponse response) throws Throwable { 
          if (failure != null) { 
           log.error(failure, "Failed: {}", reqNum); 
          } else { 
           log.info("Success: {}, consuming stream...", reqNum); 
           response.entity().getDataBytes().runWith(Sink.ignore(), materializer); 
           log.info("Success: {}, consumed stream", reqNum); 
          } 
         } 
        }, actorSystem.dispatcher()); 
     } 
    } 
} 

nie powiedzie się z:

[2015-12-15 16:17:32,609] [ INFO] [] [] a.e.s.Slf4jLogger: Slf4jLogger started 
[2015-12-15 16:17:32,628] [ DEBUG] [main] [EventStream(akka://default)] a.e.EventStream: logger log1-Slf4jLogger started 
[2015-12-15 16:17:32,636] [ DEBUG] [main] [EventStream(akka://default)] a.e.EventStream: Default Loggers started 
[2015-12-15 16:17:33,531] [ DEBUG] [spatcher-3] [akka://default/system/IO-TCP/selectors/$a/0] a.i.TcpListener: Successfully bound to /127.0.0.1:8082 
[2015-12-15 16:17:33,624] [ DEBUG] [spatcher-7] [akka://default/user/PoolInterfaceActor-0] a.h.i.e.c.PoolInterfaceActor: (Re-)starting host connection pool to 127.0.0.1:8082 
[2015-12-15 16:17:33,736] [ DEBUG] [spatcher-8] [akka://default/user/SlotProcessor-0] a.h.i.e.c.PoolSlot$SlotProcessor: become unconnected, from subscriber pending 
[2015-12-15 16:17:33,748] [ DEBUG] [patcher-11] [akka://default/user/SlotProcessor-3] a.h.i.e.c.PoolSlot$SlotProcessor: become unconnected, from subscriber pending 
[2015-12-15 16:17:33,758] [ DEBUG] [spatcher-9] [akka://default/user/SlotProcessor-2] a.h.i.e.c.PoolSlot$SlotProcessor: become unconnected, from subscriber pending 
[2015-12-15 16:17:33,762] [ DEBUG] [spatcher-9] [akka://default/user/SlotProcessor-1] a.h.i.e.c.PoolSlot$SlotProcessor: become unconnected, from subscriber pending 
[2015-12-15 16:17:33,779] [ ERROR] [patcher-11] [Object(akka://default)] j.l.Object: Failed: 36 
akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] 
    at akka.http.impl.engine.client.PoolInterfaceActor$$anonfun$receive$1.applyOrElse(PoolInterfaceActor.scala:120) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480) ~[akka-actor_2.11-2.4.0.jar:na] 
    at akka.http.impl.engine.client.PoolInterfaceActor.akka$stream$actor$ActorSubscriber$$super$aroundReceive(PoolInterfaceActor.scala:48) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] 
    at akka.stream.actor.ActorSubscriber$class.aroundReceive(ActorSubscriber.scala:201) ~[akka-stream-experimental_2.11-2.0-M2.jar:na] 
    at akka.http.impl.engine.client.PoolInterfaceActor.akka$stream$actor$ActorPublisher$$super$aroundReceive(PoolInterfaceActor.scala:48) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] 
    at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:309) ~[akka-stream-experimental_2.11-2.0-M2.jar:na] 
    at akka.http.impl.engine.client.PoolInterfaceActor.aroundReceive(PoolInterfaceActor.scala:48) ~[akka-http-core-experimental_2.11-2.0-M2.jar:na] 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525) [akka-actor_2.11-2.4.0.jar:na] 
    at akka.actor.ActorCell.invoke(ActorCell.scala:494) [akka-actor_2.11-2.4.0.jar:na] 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) [akka-actor_2.11-2.4.0.jar:na] 
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) [akka-actor_2.11-2.4.0.jar:na] 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [akka-actor_2.11-2.4.0.jar:na] 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.7.jar:na] 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.7.jar:na] 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.7.jar:na] 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.7.jar:na] 
[2015-12-15 16:17:33,780] [ ERROR] [patcher-20] [Object(akka://default)] j.l.Object: Failed: 48 

myślę, że to dlatego, że próbuję stworzyć wiele kontraktów Futures i wykonać je wszystkie naraz. Ale czy Akka nie powinna włączać przeciwciśnienia? Myślę, że używam tego źle. Próbowałem metod SuperPool, ale nic się nie zmieniło, ponieważ, jak rozumiem, Http.singleRequest ma tę samą pulę w środku. Próbowałem też ponownie użyć instancji Http zamiast wywoływać w pętli Http.get(), ale też nie pomogło.

Jaki jest prawidłowy sposób wypalania partii zgłoszeń? Planuję zrealizować partie od 10 000 do 100 000 wniosków.

Odpowiedz

13

Akka absolutnie umożliwia przeciwciśnienie, po prostu nie wykorzystujesz go. Zamiast wysyłać wiele pojedynczych żądań, możesz użyć pojedynczego Flow, aby przesłać wszystkie swoje prośby. Z documentation:

final Flow<HttpRequest, HttpResponse, Future<OutgoingConnection>> connectionFlow = 
    Http.get(actorSystem).outgoingConnection("127.0.0.1", 8082); 

Następnie można użyć tego przepływu na przetwarzanie HttpRequest obiekty:

HttpRequest req = HttpRequest.GET("/test") 

//imitates your for-loop example of 100 requests 
Source.from(() -> Collections.nCopies(100, req).iterator()) 
     .via(connectionFlow) 
     .runForeach(...) 
+0

Tęskniłem część wykorzystaniem wielu żądań w Source.from()! Dzięki!! – relgames

+0

@relgames Serdecznie zapraszamy. Szczęśliwy hacking! –

+0

@RamonJRomeroyVigil Jak używać przepływu z przeciwciśnieniem, jeśli nie mogę z góry utworzyć wiązki żądań? Załóżmy na przykład, że żądam czegoś na podstawie identyfikatorów zwróconych w jakimś paginowanym interfejsie API. Dlatego chcę używać przepływu podczas przetwarzania odpowiedzi z poprzednich żądań. – expert