2016-09-11 38 views
6

Chciałbym połączyć żądanie http za pomocą akka-http-client jako Stream. Każde żądanie http w łańcuchu zależy od sukcesu/odpowiedzi poprzednich żądań i używa go do skonstruowania nowego żądania. Jeśli żądanie nie powiedzie się, Stream powinien zwrócić odpowiedź odrzuconego żądania.Łańcuch żądań Akka-http-client w strumieniu

Jak mogę skonstruować taki strumień w pliku akka-http? jakiego interfejsu API na poziomie akka-http powinienem używać?

+0

Co powinien zmaterializować strumień, jeśli wszystkie wnioski zakończą się powodzeniem? Prawdopodobnie chcesz zdobyć trochę danych z nich? – Mikesname

+0

Tak, chciałbym uzyskać HTTPResponse na końcu strumienia; po zmaterializowaniu chciałbym się dowiedzieć, czy był to sukces porażki i powód niepowodzenia. – Rabzu

Odpowiedz

8

Jeśli tworzysz robota sieciowego, spójrz na numer this post. Ta odpowiedź zajmuje się prostszym przypadkiem, takim jak pobieranie stronicowanych zasobów, gdzie link do następnej strony znajduje się w nagłówku bieżącej odpowiedzi strony.

Można utworzyć łańcuchowe źródło - tam, gdzie jeden element prowadzi do następnego - przy użyciu metody Source.unfoldAsync. W tym celu funkcja pobiera element S i zwraca Future[Option[(S, E)]], aby określić, czy strumień powinien nadal emitować elementy typu E, przekazując stan do następnego wywołania.

W twoim przypadku, to jest coś w rodzaju:

  1. biorąc wstępną HttpRequest
  2. wytwarzając Future[HttpResponse]
  3. jeśli punkty odpowiedzią na inny adres URL, wracając Some(request -> response), inaczej None

Jednak, istnieje zmarszczka, która polega na tym, że Nie wysyłaj odpowiedzi ze strumienia, jeśli nie zawiera wskaźnika do następnego żądania.

Aby obejść ten problem, można przekazać funkcję unfoldAsync return Future[Option[(Option[HttpRequest], HttpResponse)]]. To pozwala na obsługę następujących sytuacjach:

  • obecna odpowiedź jest błąd
  • aktualne punkty odpowiedzi na inne żądanie
  • obecna reakcja nie wskazuje na inne żądanie

Poniżej znajduje się kod z adnotacjami opisujący to podejście, ale najpierw wstępny:

Podczas przesyłania strumieniowego żądań HTTP do odpowiedzi w Strumienie Akka, musisz upewnić się, że ciało odpowiedzi zostanie skonsumowane, w przeciwnym razie wydadzą się złe rzeczy (zakleszczenia itp.). Jeśli nie potrzebujesz ciała, możesz je zignorować, ale tutaj używamy funkcji konwersji HttpEntity z (potencjalne) strumień do ścisłego jednostki:

import scala.concurrent.duration._ 

def convertToStrict(r: HttpResponse): Future[HttpResponse] = 
    r.entity.toStrict(10.minutes).map(e => r.withEntity(e)) 

Następnie kilka funkcji do tworzenia Option[HttpRequest] z HttpResponse. Ten przykład wykorzystuje schemat jak linki paginacji GitHub, gdzie nagłówek Links zawiera, na przykład: <https://api.github.com/...> rel="next":

def nextUri(r: HttpResponse): Seq[Uri] = for { 
    linkHeader <- r.header[Link].toSeq 
    value <- linkHeader.values 
    params <- value.params if params.key == "rel" && params.value() == "next" 
} yield value.uri 

def getNextRequest(r: HttpResponse): Option[HttpRequest] = 
    nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next)) 

Dalej, prawdziwą funkcję będziemy przechodzić do unfoldAsync.Używa Akka HTTP Http().singleRequest() API do podjęcia HttpRequest i produkować Future[HttpResponse]:

def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = 
    reqOption match { 
    case Some(req) => Http().singleRequest(req).flatMap { response => 
     // handle the error case. Here we just return the errored response 
     // with no next item. 
     if (response.status.isFailure()) Future.successful(Some(None -> response)) 

     // Otherwise, convert the response to a strict response by 
     // taking up the body and looking for a next request. 
     else convertToStrict(response).map { strictResponse => 
     getNextRequest(strictResponse) match { 
      // If we have no next request, return Some containing an 
      // empty state, but the current value 
      case None => Some(None -> strictResponse) 

      // Otherwise, pass on the request... 
      case next => Some(next -> strictResponse) 
     } 
     } 
    } 
    // Finally, there's no next request, end the stream by 
    // returning none as the state. 
    case None => Future.successful(None) 
    } 

Zauważ, że jeśli otrzymamy odpowiedź błędami, strumień nie będzie kontynuowana, ponieważ wracamy None w następnym stanie.

można wywołać tego, aby strumień HttpResponse obiektów tak:

val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com") 
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
    Some(initialRequest)(chainRequests) 

Co do zwrotu wartości ostatniego (lub błędami) odpowiedzi, po prostu trzeba użyć Sink.last, ponieważ Stream kończy się, gdy zakończy się pomyślnie lub na pierwszej błędnej odpowiedzi. Na przykład:

def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
     Some(initialRequest))(chainRequests) 
    .map(_.status) 
    .runWith(Sink.last) 
+0

Dziękuję. Bardzo inspirująca odpowiedź – Rabzu