Moja aplikacja ma interfejs Akka-Websocket. W skład gniazda internetowego wchodzi aktor-subskrybent i wydawca aktorski. Abonent obsługuje polecenia, wysyłając je do odpowiedniego aktora. Wydawca słucha strumienia zdarzeń i publikuje informacje o aktualizacjach z powrotem do strumienia (i tak w końcu do klienta). To działa dobrze.Jak wysłać wiadomość w strumieniu reaktywnym z subskrybenta do wydawcy w połączeniu z gniazdem sieciowym
Moje pytanie: Jak subskrybent może wysłać zdarzenie z powrotem do strumienia? Na przykład, aby potwierdzić wykonanie odebranego polecenia.
public class WebSocketApp extends HttpApp {
private static final Gson gson = new Gson();
@Override
public Route createRoute() {
return get(
path("metrics").route(handleWebSocketMessages(metrics()))
);
}
private Flow<Message, Message, ?> metrics() {
Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());
Source<Message, ActorRef> metricsSource =
Source.actorPublisher(WebSocketDataPublisherActor.props())
.map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));
return Flow.fromSinkAndSource(metricsSink, metricsSource);
}
}
Miłym rozwiązaniem mogłoby być, że aktor z prenumeraty (the WebSocketCommandSubscriber
aktor w powyższym kodzie) może wysłać wiadomość z powrotem do strumienia jak sender().tell(...)
...