Potrzebuję utworzyć akka.stream.scaladsl.Source[T, Unit]
z kolekcji Future[T]
.W pliku akka-stream, jak utworzyć nieuporządkowane źródło z kolekcji futures
przykład zawierającą zbiór futures powracających całkowitymi,
val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)
Jak stworzyć
val source: Source[Int, Unit] = ???
od niego.
Nie mogę użyć kombinatora Future.sequence
, ponieważ od tego czasu czekałbym na każdą przyszłość przed uzyskaniem czegokolwiek od źródła. Chcę uzyskać wyniki w dowolnej kolejności jak tylko zakończy się jakakolwiek przyszłość.
Rozumiem, że Source
jest czysto funkcjonalnym API i nie powinien niczego uruchamiać przed jakimś materializowaniem. Tak, mój pomysł jest użycie Iterator
(który jest leniwy), aby utworzyć źródło:
Source {() =>
new Iterator[Future[Int]] {
override def hasNext: Boolean = ???
override def next(): Future[Int] = ???
}
}
Ale to byłoby źródłem futures, nie rzeczywistych wartości. Mogę też zablokować na next
używając Await.result(future)
, ale nie jestem pewien, który wątek puli wątku będzie zablokowany. Również to będzie sekwencyjnie wywoływać transakcje futures, podczas gdy ja potrzebuję równoległego wykonania.
UPDATE 2: okazało się, że był to znacznie łatwiejszy sposób to zrobić (dzięki Wiktora Klang):
Source(futures).mapAsync(1)(identity)
UPDATE: oto co mam na podstawie @sschaef odpowiedź:
def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = {
def run(actor: ActorRef): Unit = {
futures.foreach { future =>
future.onComplete {
case Success(value) =>
actor ! value
case Failure(NonFatal(t)) =>
actor ! Status.Failure(t) // to signal error
}
}
Future.sequence(futures).onSuccess { case _ =>
actor ! Status.Success(()) // to signal stream's end
}
}
Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run)
}
// ScalaTest tests follow
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
"futuresToSource" should "convert futures collection to akka-stream source" in {
val f1 = Future(1)
val f2 = Future(2)
val f3 = Future(3)
whenReady {
futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _)
} { results =>
results should contain theSameElementsAs Seq(1, 2, 3)
}
}
it should "fail on future failure" in {
val f1 = Future(1)
val f2 = Future(2)
val f3 = Future.failed(new RuntimeException("future failed"))
whenReady {
futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed
} { t =>
t shouldBe a [RuntimeException]
t should have message "future failed"
}
}
Co jeśli moja przyszłość jest typu "Przyszłość [Źródło [T, jednostka]]" - czy mogę zrobić coś lepszego niż 'Źródło (futures) .mapAsyncUnordered (1) (tożsamość) .flatten (FlattenStrategy.concat)'? Chciałbym, żeby to spłaszczenie było nieuporządkowane, a także wspierać poziom równoległości. – Tvaroh
Jestem obecnie (ilekroć znajdę godziny lub dwie) pracujący nad 'flatten (FlattenStrategy.merge)', który zrobiłby to, co chcesz. W międzyczasie możesz użyć 'mapAsyncUnordered (par) (identity)' + implementacja FlexiMerge? –
Viktor, nie patrzyłem na FlexiMerge, spróbuję. Dziękuję Ci. – Tvaroh