Jeśli tworzysz robota sieciowego, spójrz na numer this post. Ta odpowiedź zajmuje się prostszym przypadkiem, takim jak pobieranie stronicowanych zasobów, gdzie link do następnej strony znajduje się w nagłówku bieżącej odpowiedzi strony.
Można utworzyć łańcuchowe źródło - tam, gdzie jeden element prowadzi do następnego - przy użyciu metody Source.unfoldAsync
. W tym celu funkcja pobiera element S
i zwraca Future[Option[(S, E)]]
, aby określić, czy strumień powinien nadal emitować elementy typu E
, przekazując stan do następnego wywołania.
W twoim przypadku, to jest coś w rodzaju:
- biorąc wstępną
HttpRequest
- wytwarzając
Future[HttpResponse]
- jeśli punkty odpowiedzią na inny adres URL, wracając
Some(request -> response)
, inaczej None
Jednak, istnieje zmarszczka, która polega na tym, że Nie wysyłaj odpowiedzi ze strumienia, jeśli nie zawiera wskaźnika do następnego żądania.
Aby obejść ten problem, można przekazać funkcję unfoldAsync
return Future[Option[(Option[HttpRequest], HttpResponse)]]
. To pozwala na obsługę następujących sytuacjach:
- obecna odpowiedź jest błąd
- aktualne punkty odpowiedzi na inne żądanie
- obecna reakcja nie wskazuje na inne żądanie
Poniżej znajduje się kod z adnotacjami opisujący to podejście, ale najpierw wstępny:
Podczas przesyłania strumieniowego żądań HTTP do odpowiedzi w Strumienie Akka, musisz upewnić się, że ciało odpowiedzi zostanie skonsumowane, w przeciwnym razie wydadzą się złe rzeczy (zakleszczenia itp.). Jeśli nie potrzebujesz ciała, możesz je zignorować, ale tutaj używamy funkcji konwersji HttpEntity
z (potencjalne) strumień do ścisłego jednostki:
import scala.concurrent.duration._
def convertToStrict(r: HttpResponse): Future[HttpResponse] =
r.entity.toStrict(10.minutes).map(e => r.withEntity(e))
Następnie kilka funkcji do tworzenia Option[HttpRequest]
z HttpResponse
. Ten przykład wykorzystuje schemat jak linki paginacji GitHub, gdzie nagłówek Links
zawiera, na przykład: <https://api.github.com/...> rel="next"
:
def nextUri(r: HttpResponse): Seq[Uri] = for {
linkHeader <- r.header[Link].toSeq
value <- linkHeader.values
params <- value.params if params.key == "rel" && params.value() == "next"
} yield value.uri
def getNextRequest(r: HttpResponse): Option[HttpRequest] =
nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))
Dalej, prawdziwą funkcję będziemy przechodzić do unfoldAsync
.Używa Akka HTTP Http().singleRequest()
API do podjęcia HttpRequest
i produkować Future[HttpResponse]
:
def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
reqOption match {
case Some(req) => Http().singleRequest(req).flatMap { response =>
// handle the error case. Here we just return the errored response
// with no next item.
if (response.status.isFailure()) Future.successful(Some(None -> response))
// Otherwise, convert the response to a strict response by
// taking up the body and looking for a next request.
else convertToStrict(response).map { strictResponse =>
getNextRequest(strictResponse) match {
// If we have no next request, return Some containing an
// empty state, but the current value
case None => Some(None -> strictResponse)
// Otherwise, pass on the request...
case next => Some(next -> strictResponse)
}
}
}
// Finally, there's no next request, end the stream by
// returning none as the state.
case None => Future.successful(None)
}
Zauważ, że jeśli otrzymamy odpowiedź błędami, strumień nie będzie kontynuowana, ponieważ wracamy None
w następnym stanie.
można wywołać tego, aby strumień HttpResponse
obiektów tak:
val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
Some(initialRequest)(chainRequests)
Co do zwrotu wartości ostatniego (lub błędami) odpowiedzi, po prostu trzeba użyć Sink.last
, ponieważ Stream kończy się, gdy zakończy się pomyślnie lub na pierwszej błędnej odpowiedzi. Na przykład:
def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
Some(initialRequest))(chainRequests)
.map(_.status)
.runWith(Sink.last)
Co powinien zmaterializować strumień, jeśli wszystkie wnioski zakończą się powodzeniem? Prawdopodobnie chcesz zdobyć trochę danych z nich? – Mikesname
Tak, chciałbym uzyskać HTTPResponse na końcu strumienia; po zmaterializowaniu chciałbym się dowiedzieć, czy był to sukces porażki i powód niepowodzenia. – Rabzu