2016-03-25 11 views
5

Jak opisano akka streams documentation że próbował utworzyć pulę robocze (przepływów)siły roboczej o Akka strumieni

def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = { 
    import GraphDSL.Implicits._ 

    Flow.fromGraph(GraphDSL.create() { implicit b => 
     val balancer = b.add(Balance[In](workerCount)) 
     val merge = b.add(Merge[Out](workerCount)) 

     for (_ <- 1 to workerCount) { 
     balancer ~> worker ~> merge 
     } 
     FlowShape(balancer.in, merge.out) 
    }) 
    } 

wtedy używane tej funkcji do prowadzenia przepływu równolegle:

def main(args: Array[String]) { 
    val system = ActorSystem() 
    implicit val mat = ActorMaterializer.create(system) 

    val flow = Flow[Int].map(e => { 
     println(e) 
     Thread.sleep(1000) // 1 second 
     e 
    }) 

    Source(Range.apply(1, 10).toList) 
     .via(balancer(flow, 3)) 
     .runForeach(e => {}) 
    } 

Otrzymuję oczekiwane wyjście 1, 2, 3, 4, 5, 6, 7, 8, 9, ale liczby pojawiają się z prędkością 1 na sekundę (brak równoległości). Co robię źle?

+0

co kontekst wykonanie? Jeśli używasz puli wątków o ustalonej wielkości, jest to normalna –

+0

Oznacza to, że domyślny rozmiar kontekstu wynosi 1? Czy mógłbyś podać jaki jest preferowany sposób konfiguracji kontekstu wykonania? – Mihai238

+0

Brak domyślnego kontekstu nie jest ustalony, prawdopodobnie importujesz globalny niejawny kontekst, który będzie zależał od zbyt dużej rzeczy jak wersja, możesz spróbować 'niejawny val ec = ExecutionContext.fromExecutor (Executors.newFixedThreadPool (10))' –

Odpowiedz

1

Jak podkreślił Endre Varga, sam przepływ powinien być oznaczony jako .async.

Ale nawet wtedy zachowanie nie jest deterministyczne, ponieważ etapy asynchroniczne mają domyślny rozmiar bufora równy 16, a moduł równoważący może wysyłać wszystkie wiadomości do tego samego pracownika.

W rezultacie balancer ~> worker.async.addAttributes(Attributes.inputBuffer(1, 1)) ~> merge doprowadziłoby do pożądanego zachowania.

na odpowiedź udzieloną przez członka projektu patrz: https://github.com/akka/akka/issues/20146#issuecomment-201381356

3

Dokumenty w tej sekcji są nieaktualne i zostaną poprawione w następnym wydaniu. Zasadniczo wystarczy wywołać .async na samym strumieniu. Robiąc to, narysujesz "skrzynkę" wokół przepływu (co możesz sobie wyobrazić jako pudełko z jednym portem wejściowym i wyjściowym), co zapobiegnie fuzji w tym polu. Robiąc to zasadniczo wszyscy pracownicy będą na dedykowanych aktorów. Reszta wykresu (etapy rozgłaszania i łączenia) będzie dzielić innego aktora (nie będą działać na osobnych aktorów, skrzynka asynchroniczna tylko chroni przepływ, rzeczy na zewnątrz nadal będą połączone).

+0

Wierzę, że tak powinno być, ale wierzę również że tak nie jest. 'for (i <- 1 to workerCount) {balancer ~> worker.async ~> merge}' wydaje się nie działać. – lpiepiora

+2

Jak zauważyłem w twoim bilecie (i po zabawie, sam się pomyliłem) okazuje się, że domyślny rozmiar bufora dla asynchronicznych etapów wynosi 16, a saldo kończy wysyłanie wszystkich wiadomości do pojedynczego etapu, ponieważ raportuje, że nadal ma bufor przestrzeń. Jeśli wyślesz więcej wiadomości (np. 100) lub ustawisz rozmiar bufora dla etapu roboczego na 1, zobaczysz pożądane wyniki. –

+0

Tak, prawda, ustawienie 'waitForAllDownstreams = true' może trochę pomóc. Wydaje mi się, że (tak naprawdę to nie sprawdziłem), jak już powiedziałeś, pierwsze raporty końcowe i wszystkie wiadomości są wysyłane do niego.Z 'waitForAllDownstreams' to szwy dystrybucji jest nieco lepsza – lpiepiora