2015-09-06 22 views
7

Potrzebuję utworzyć akka.stream.scaladsl.Source[T, Unit] z kolekcji Future[T].W pliku akka-stream, jak utworzyć nieuporządkowane źródło z kolekcji futures

przykład zawierającą zbiór futures powracających całkowitymi,

val f1: Future[Int] = ??? 
val f2: Future[Int] = ??? 
val fN: Future[Int] = ??? 
val futures = List(f1, f2, fN) 

Jak stworzyć

val source: Source[Int, Unit] = ??? 

od niego.

Nie mogę użyć kombinatora Future.sequence, ponieważ od tego czasu czekałbym na każdą przyszłość przed uzyskaniem czegokolwiek od źródła. Chcę uzyskać wyniki w dowolnej kolejności jak tylko zakończy się jakakolwiek przyszłość.

Rozumiem, że Source jest czysto funkcjonalnym API i nie powinien niczego uruchamiać przed jakimś materializowaniem. Tak, mój pomysł jest użycie Iterator (który jest leniwy), aby utworzyć źródło:

Source {() => 
    new Iterator[Future[Int]] { 
    override def hasNext: Boolean = ??? 
    override def next(): Future[Int] = ??? 
    } 
} 

Ale to byłoby źródłem futures, nie rzeczywistych wartości. Mogę też zablokować na next używając Await.result(future), ale nie jestem pewien, który wątek puli wątku będzie zablokowany. Również to będzie sekwencyjnie wywoływać transakcje futures, podczas gdy ja potrzebuję równoległego wykonania.

UPDATE 2: okazało się, że był to znacznie łatwiejszy sposób to zrobić (dzięki Wiktora Klang):

Source(futures).mapAsync(1)(identity) 

UPDATE: oto co mam na podstawie @sschaef odpowiedź:

def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = { 
    def run(actor: ActorRef): Unit = { 
    futures.foreach { future => 
     future.onComplete { 
     case Success(value) => 
      actor ! value 
     case Failure(NonFatal(t)) => 
      actor ! Status.Failure(t) // to signal error 
     } 
    } 

    Future.sequence(futures).onSuccess { case _ => 
     actor ! Status.Success(()) // to signal stream's end 
    } 
    } 

    Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run) 
} 

// ScalaTest tests follow 

import scala.concurrent.ExecutionContext.Implicits.global 

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

"futuresToSource" should "convert futures collection to akka-stream source" in { 
    val f1 = Future(1) 
    val f2 = Future(2) 
    val f3 = Future(3) 

    whenReady { 
    futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _) 
    } { results => 
    results should contain theSameElementsAs Seq(1, 2, 3) 
    } 
} 

it should "fail on future failure" in { 
    val f1 = Future(1) 
    val f2 = Future(2) 
    val f3 = Future.failed(new RuntimeException("future failed")) 

    whenReady { 
    futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed 
    } { t => 
    t shouldBe a [RuntimeException] 
    t should have message "future failed" 
    } 
} 

Odpowiedz

6

Tworzenie źródła Futures, a następnie "spłaszczyć" go poprzez mapAsync:

scala> Source(List(f1,f2,fN)).mapAsync(1)(identity) 
res0: akka.stream.scaladsl.Source[Int,Unit] = [email protected] 
+0

Co jeśli moja przyszłość jest typu "Przyszłość [Źródło [T, jednostka]]" - czy mogę zrobić coś lepszego niż 'Źródło (futures) .mapAsyncUnordered (1) (tożsamość) .flatten (FlattenStrategy.concat)'? Chciałbym, żeby to spłaszczenie było nieuporządkowane, a także wspierać poziom równoległości. – Tvaroh

+0

Jestem obecnie (ilekroć znajdę godziny lub dwie) pracujący nad 'flatten (FlattenStrategy.merge)', który zrobiłby to, co chcesz. W międzyczasie możesz użyć 'mapAsyncUnordered (par) (identity)' + implementacja FlexiMerge? –

+0

Viktor, nie patrzyłem na FlexiMerge, spróbuję. Dziękuję Ci. – Tvaroh

5

Jednym z najprostszych sposobów, by nakarmić źródłem jest przez aktora:

import scala.concurrent.Future 
import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 

implicit val system = ActorSystem("MySystem") 

def run(actor: ActorRef): Unit = { 
    import system.dispatcher 
    Future { Thread.sleep(100); actor ! 1 } 
    Future { Thread.sleep(200); actor ! 2 } 
    Future { Thread.sleep(300); actor ! 3 } 
} 

val source = Source 
    .actorRef[Int](0, OverflowStrategy.fail) 
    .mapMaterializedValue(ref ⇒ run(ref)) 
implicit val m = ActorMaterializer() 

source runForeach { int ⇒ 
    println(s"received: $int") 
} 

Aktor jest tworzony za pomocą metody Source.actorRef i udostępniany za pomocą metody mapMaterializedValue. run po prostu pobiera Aktor i wysyła do niego wszystkie wypełnione wartości, do których można uzyskać dostęp poprzez source. W powyższym przykładzie wartości są wysyłane bezpośrednio w Przyszłości, ale można to oczywiście zrobić wszędzie (na przykład w zaproszeniu onComplete na przyszłość).

+0

BTW, dlaczego pierwszym argumentem jest '' actorRef' 0'? Czy to ma znaczenie? – Tvaroh

+0

Jeśli konsument może pobrać wszystkie elementy ze źródła, to na pewno nie potrzebujesz pamięci podręcznej, dlatego jest 0. – sschaef

+0

Próbowałem, ale zero nie działało (było wyrzucanie wyjątku). Rozmiar równy wielkości kolekcji futures działał bez zarzutu. – Tvaroh