2016-11-05 60 views
6

mam następujący prosty przypadek hierarchię klasy:Akka Strumienie podzielony przez strumień typu

sealed trait Message 
case class Foo(bar: Int) extends Message 
case class Baz(qux: String) extends Message 

i mam Flow[Message, Message, NotUsed] (od Protokół A websocket opartego o kodek już na miejscu).

Chcę zdemultipleksować ten Flow[Message] na osobne przepływy dla typów Foo i Baz, ponieważ są one przetwarzane przez zupełnie różne ścieżki.

Jaki jest najprostszy sposób to robić? Powinien być oczywisty, ale brakuje mi czegoś ...

Odpowiedz

5

Jednym ze sposobów jest użycie Utwórz RunnableGraph, który zawiera Flows dla każdego typu wiadomości.

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 

    val in = Source(...) // Some message source 
    val out = Sink.ignore 

    val foo = builder.add(Flow[Message].map (x => x match { case [email protected](_) => f })) 
    val baz = builder.add(Flow[Message].map (x => x match { case [email protected](_) => b })) 
    val partition = builder.add(Partition[Message](2, { 
    case Foo(_) => 0 
    case Baz(_) => 1 
    })) 

    partition ~> foo ~> // other Flow[Foo] here ~> out 
    partition ~> baz ~> // other Flow[Baz] here ~> out 

    ClosedShape 
} 

g.run() 
+0

Dobrze, partycja. OK, mogę to zrobić. Prawdopodobnie korzystałoby by z tego wbudowany kombinator; być może, zrobię żądanie ściągnięcia. –

+0

@AlexanderTemerev To może być interesujące: http://doc.akka.io/api/akka/2.4/?_ga=1.34091558.643806930.1478315511#akka.stream.scaladsl.Partition – Brian