2015-04-17 25 views
5

Próbuję przesyłać strumieniowo danych szeregów czasowych za pomocą Springframework SimpMessagingTemplate (domyślna implementacja Stomp) do emisji wiadomości do tematu, który subskrybował klient SockJS. Jednak wiadomości są odbierane poza kolejnością. Serwer jest pojedynczy wątek i wiadomości są wysyłane w porządku rosnącym według ich znaczników czasu. Klient w jakiś sposób odebrał wiadomości z zamówienia.SockJS odbierać wiadomości stomp z websocket wiosny z rzędu

Używam najnowszej wersji zarówno stompjs, jak i springframework (wydanie 4.1.6).

Odpowiedz

8

Znaleziono podstawową przyczynę tego problemu. Wiadomości wysyłały "poprawną" kolejność z perspektywy implementacji aplikacji (tj. ConvertAndSend() są wywoływane w jeden wątek lub przynajmniej wątkowo bezpieczny sposób) .Jednakże gniazdo sieciowe Springframework wykorzystuje implementację reaktora-tcp, która będzie przetwarzać komunikaty clientOutboundChannel z puli wątków, dzięki czemu komunikaty mogą być zapisywane w gnieździe tcp w innej kolejności, w jakiej zostały dostarczone.Kiedy skonfigurowałem gniazdo internetowe do ograniczenia 1 wątku dla klientaOutboundChannel, zamówienie zostanie zachowane

Ten problem dotyczy nie w SocketJS, ale ograniczenie bieżącego projektu gniazda sieci Spring

+0

W jaki sposób można skonfigurować gniazdo internetowe, aby ograniczyć 1 wątek dla clientOutboundChannel? – Tobia

+0

Czy możesz podać przykład? – Tobia

4

Jest to problem z projektowaniem gniazd internetowych wiosennych Aby odbierać wiadomości w prawidłowej kolejności należy ustawić corePoolSize klientów websocket na 1.

@Configuration 
@EnableWebSocketMessageBroker 
public class WebSocketMessageBrokerConfiguration extends AbstractWebSocketMessageBrokerConfigurer { 

    @Override 
    public void configureClientOutboundChannel(ChannelRegistration registration) { 
     registration.taskExecutor().corePoolSize(1); 
    } 

    @Override 
    public void configureClientInboundChannel(ChannelRegistration registration) { 
     registration.taskExecutor().corePoolSize(1); 
    } 
} 
0

Wystąpił również ten problem. Nie chcę ograniczać rozmiaru puli wątków do 1, ponieważ spowoduje to obciążenie aplikacji. Zamiast tego użyłem StripedExecutorService do przetwarzania wiadomości przychodzących i wychodzących z mojej aplikacji. Ten typ usługi executora gwarantuje uporządkowane przetwarzanie komunikatów dla zadań, które mają ten sam pasek. Dla mnie używam identyfikatora sesji WebSocket jako paska. Zarejestruj ten executor za pomocą metody ChannelRegistration.taskExecutor() na swoim kanale przychodzącym, brokerskim i wychodzącym, co zagwarantuje uporządkowane wiadomości. Wybierz swój pasek mądrze.