2016-04-30 12 views
5

Moja aplikacja ma interfejs Akka-Websocket. W skład gniazda internetowego wchodzi aktor-subskrybent i wydawca aktorski. Abonent obsługuje polecenia, wysyłając je do odpowiedniego aktora. Wydawca słucha strumienia zdarzeń i publikuje informacje o aktualizacjach z powrotem do strumienia (i tak w końcu do klienta). To działa dobrze.Jak wysłać wiadomość w strumieniu reaktywnym z subskrybenta do wydawcy w połączeniu z gniazdem sieciowym

Moje pytanie: Jak subskrybent może wysłać zdarzenie z powrotem do strumienia? Na przykład, aby potwierdzić wykonanie odebranego polecenia.

public class WebSocketApp extends HttpApp { 

    private static final Gson gson = new Gson(); 

    @Override 
    public Route createRoute() { 
    return get(
     path("metrics").route(handleWebSocketMessages(metrics())) 
     ); 
    } 

    private Flow<Message, Message, ?> metrics() { 
    Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props()); 
    Source<Message, ActorRef> metricsSource = 
     Source.actorPublisher(WebSocketDataPublisherActor.props()) 
     .map((measurementData) -> TextMessage.create(gson.toJson(measurementData))); 
    return Flow.fromSinkAndSource(metricsSink, metricsSource); 
    } 
} 

Miłym rozwiązaniem mogłoby być, że aktor z prenumeraty (the WebSocketCommandSubscriber aktor w powyższym kodzie) może wysłać wiadomość z powrotem do strumienia jak sender().tell(...) ...

Odpowiedz

4

Nie, to nie jest możliwe, nie bezpośrednio. Strumienie są zawsze jednokierunkowe - wszystkie wiadomości płyną w jednym kierunku, podczas gdy popyt na nie przepływa w przeciwnym kierunku. Musisz przesłać wiadomości potwierdzające z zlewu do źródła, aby te ostatnie wysłały je z powrotem do klienta, na przykład rejestrując aktora źródłowego w odtwarzaczu zlewu. To może wyglądać następująco:

Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> { 
    sinkActor.tell(new RegisterSource(sourceActor), null); 
}) 

Następnie po zlewu aktor otrzymuje RegisterSource wiadomości można wysyłać komunikaty potwierdzeń na podany ActorRef, który następnie przekazuje je do strumienia wyjściowego.