2015-04-09 17 views
25

Próbuję utworzyć statyczny klaster dwóch aplikacji Spring Boot z osadzonymi serwerami HornetQ. Jedna aplikacja/serwer będzie obsługiwać zdarzenia zewnętrzne i generować komunikaty, które będą wysyłane do kolejki komunikatów. Druga aplikacja/serwer będzie nasłuchiwać w kolejce komunikatów i przetwarzać wiadomości przychodzące. Ponieważ połączenie między tymi dwoma aplikacjami jest niewiarygodne, każdy będzie używał tylko klientów lokalnych/wVM do generowania/odbierania komunikatów na swoim serwerze i polegając na funkcji grupowania, aby przekazywać wiadomości do kolejki na innym serwerze w klastrze.Spring Boot osadzony klaster HornetQ nie przekazuje wiadomości

Używam HornetQConfigurationCustomizer do dostosowania osadzonego serwera HornetQ, ponieważ domyślnie jest dostarczany tylko z InVMConnectorFactory.

Stworzyłem kilka przykładowych aplikacji, które ilustrują tę konfigurację, w tym przykładzie "ServerSend", odnosi się do serwera, który będzie produkować wiadomości, a "ServerReceive" odnosi się do serwera, który będzie zużywać wiadomości.

pom.xml do zastosowań należą:

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-hornetq</artifactId> 
</dependency> 
<dependency> 
    <groupId>org.hornetq</groupId> 
    <artifactId>hornetq-jms-server</artifactId> 
</dependency> 

DemoHornetqServerSendApplication:

@SpringBootApplication 
@EnableScheduling 
public class DemoHornetqServerSendApplication { 
    @Autowired 
    private JmsTemplate jmsTemplate; 
    private @Value("${spring.hornetq.embedded.queues}") String testQueue; 

    public static void main(String[] args) { 
     SpringApplication.run(DemoHornetqServerSendApplication.class, args); 
    } 

    @Scheduled(fixedRate = 5000) 
    private void sendMessage() { 
     String message = "Timestamp from Server: " + System.currentTimeMillis(); 
     System.out.println("Sending message: " + message); 
     jmsTemplate.convertAndSend(testQueue, message); 
    } 

    @Bean 
    public HornetQConfigurationCustomizer hornetCustomizer() { 
     return new HornetQConfigurationCustomizer() { 

      @Override 
      public void customize(Configuration configuration) { 
       String serverSendConnectorName = "server-send-connector"; 
       String serverReceiveConnectorName = "server-receive-connector"; 

       Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations(); 

       Map<String, Object> params = new HashMap<String, Object>(); 
       params.put(TransportConstants.HOST_PROP_NAME, "localhost"); 
       params.put(TransportConstants.PORT_PROP_NAME, "5445"); 
       TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); 
       connectorConf.put(serverSendConnectorName, tc); 

       Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations(); 
       tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); 
       acceptors.add(tc); 

       params = new HashMap<String, Object>(); 
       params.put(TransportConstants.HOST_PROP_NAME, "localhost"); 
       params.put(TransportConstants.PORT_PROP_NAME, "5446"); 
       tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); 
       connectorConf.put(serverReceiveConnectorName, tc); 

       List<String> staticConnectors = new ArrayList<String>(); 
       staticConnectors.add(serverReceiveConnectorName); 
       ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
         "my-cluster", // name 
         "jms", // address 
         serverSendConnectorName, // connector name 
         500, // retry interval 
         true, // duplicate detection 
         true, // forward when no consumers 
         1, // max hops 
         1000000, // confirmation window size 
         staticConnectors, 
         true // allow direct connections only 
         ); 
       configuration.getClusterConfigurations().add(conf); 

       AddressSettings setting = new AddressSettings(); 
       setting.setRedistributionDelay(0); 
       configuration.getAddressesSettings().put("#", setting); 
      } 
     }; 
    } 
} 

application.properties (ServerSend):

spring.hornetq.mode=embedded 
spring.hornetq.embedded.enabled=true 
spring.hornetq.embedded.queues=jms.testqueue 
spring.hornetq.embedded.cluster-password=password 

DemoHornetqServerReceiveApplication:

@SpringBootApplication 
@EnableJms 
public class DemoHornetqServerReceiveApplication { 
    @Autowired 
    private JmsTemplate jmsTemplate; 
    private @Value("${spring.hornetq.embedded.queues}") String testQueue; 

    public static void main(String[] args) { 
     SpringApplication.run(DemoHornetqServerReceiveApplication.class, args); 
    } 

    @JmsListener(destination="${spring.hornetq.embedded.queues}") 
    public void receiveMessage(String message) { 
     System.out.println("Received message: " + message); 
    } 

    @Bean 
    public HornetQConfigurationCustomizer hornetCustomizer() { 
     return new HornetQConfigurationCustomizer() { 

      @Override 
      public void customize(Configuration configuration) { 
       String serverSendConnectorName = "server-send-connector"; 
       String serverReceiveConnectorName = "server-receive-connector"; 

       Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations(); 

       Map<String, Object> params = new HashMap<String, Object>(); 
       params.put(TransportConstants.HOST_PROP_NAME, "localhost"); 
       params.put(TransportConstants.PORT_PROP_NAME, "5446"); 
       TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); 
       connectorConf.put(serverReceiveConnectorName, tc); 

       Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations(); 
       tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); 
       acceptors.add(tc); 

       params = new HashMap<String, Object>(); 
       params.put(TransportConstants.HOST_PROP_NAME, "localhost"); 
       params.put(TransportConstants.PORT_PROP_NAME, "5445"); 
       tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); 
       connectorConf.put(serverSendConnectorName, tc); 

       List<String> staticConnectors = new ArrayList<String>(); 
       staticConnectors.add(serverSendConnectorName); 
       ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
         "my-cluster", // name 
         "jms", // address 
         serverReceiveConnectorName, // connector name 
         500, // retry interval 
         true, // duplicate detection 
         true, // forward when no consumers 
         1, // max hops 
         1000000, // confirmation window size 
         staticConnectors, 
         true // allow direct connections only 
         ); 
       configuration.getClusterConfigurations().add(conf); 

       AddressSettings setting = new AddressSettings(); 
       setting.setRedistributionDelay(0); 
       configuration.getAddressesSettings().put("#", setting); 
      } 
     }; 
    } 
} 

application.properties (ServerReceive):

spring.hornetq.mode=embedded 
spring.hornetq.embedded.enabled=true 
spring.hornetq.embedded.queues=jms.testqueue 
spring.hornetq.embedded.cluster-password=password 

Po uruchomieniu aplikacji zarówno zalogować wyjście to pokazuje:

ServerSend:

2015-04-09 11:11: 58.471 INFO 7536 --- [główny] org.hornetq.core.server: HQ221000: serwer na żywo zaczyna się konfiguracją Konfiguracja HornetQ (klaster = true, backup = false, sharedStore = true, journalDirectory = C: \ Users **** \ AppData \ Loca l \ Temp \ hornetq-data/journal, bindingsDirectory = data/bindingings, largeMessagesDirectory = data/largemessages, pagingDirectory = data/paging)
2015-04-09 11: 11: 58.501 INFO 7536 --- [główny] org. hornetq.core.server: HQ221045: libaio nie jest dostępny, przełącza konfigurację na NIO
2015-04-09 11: 11: 58.595 INFO 7536 --- [główny] org.hornetq.core.server: HQ221043: Dodawanie protokołu obsługa CORE
2015-04-09 11: 11: 58.720 INFO 7536 --- [główny] org.hornetq.core.server: HQ221003: próba wdrożenia kolejki jms.queue.jms.testqueue
2015-04-09 11: 11: 59.568 INFO 7536 --- [główny] org.hornetq.core.server: HQ221020: Uruchomiony Netty Acceptor w wersji 4.0.13. Główny localhost: 5445
2015-04-09 11: 1 1: 59.593 INFO 7536 --- [główny] org.hornetq.core.server: HQ221007: Serwer jest teraz dostępny na żywo
2015-04-09 11: 11: 59.593 INFO 7536 --- [główny] org.hornetq.core .server: HQ221001: HornetQ Server wersja 2.4.5.KOŃCOWA (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]

ServerReceive:

2015-04-09 11: 12: 04,401 INFORMACJE 4528 --- [głównym] org.hornetq.core.server: HQ221000: serwer na żywo zaczyna się konfiguracją Konfiguracja HornetQ (clustered = true, backup = false, sharedStore = true, journalDirectory = C: \ Users **** \ AppData \ Local \ Temp \ hornetq- data/journal, bindingsDirectory = data/bindingings, largeMessagesDirectory = data/largemessages, pagingDirectory = data/paging)
2015-04-09 11: 12: 04.410 INFO 4528 --- [główne] org.hornetq.core.server: HQ221045: libaio nie jest dostępny, przełączanie konfiguracja do NIO
2015-04-09 11: 12: 04.520 INFO 4528 --- [główny] org.hornetq.core.server: HQ221043: Dodanie obsługi protokołu CORE
2015-04-09 11: 12: 04.629 INFO 4528 --- [główny] org.hornetq.core.server: HQ221003: próba wdrożenia kolejki jms.queue.jms.testqueue
2015-04-09 11: 12: 05.545 INFO 4528 --- [główny] org. hornetq.core.server: HQ221020: Rozpoczęty Netty Acceptor w wersji 4.0.13. Główny localhost: 5446
2015-04-09 11: 12: 05.578 INFO 4528 --- [główny] org.hornetq.core.server: HQ221007: Serwer jest teraz dostępny na żywo
2015-04-09 11: 12: 05.578 INFO 4528 --- [główny] org.hornetq.core.server: HQ221001: Serwer HornetQ wersja 2.4.5.FINAL (Wild Hornet, 124) [c139929d -d90f-11e4-ba2e -e58abf5d6944]

widzę clustered=true obu wyjściach a to pokazują false jeśli usuwa konfigurację klastra z HornetQConfigurationCustomizer, a więc musi mieć pewien wpływ.

Teraz ServerSend pokazuje to na wyjściu konsoli:

wysyłanie wiadomości: Znacznik czasu z serwerem: 1428574324910
wysyłanie wiadomości: Znacznik czasu z serwerem: 1428574329899
wysyłanie wiadomości: Znacznik czasu z serwerem: 1428574334904

Jednak ServerReceive nic nie pokazuje.

Wygląda na to, że wiadomości nie są przekazywane z usługi ServerSend do ServerReceive.

Zrobiłem więcej testów, tworząc dwie dodatkowe aplikacje Spring Boot (ClientSend i ClientReceive), które mają , a nie osadzony serwer HornetQ i zamiast tego łączą się z "rodzimym" serwerem.

pom.xml zarówno dla aplikacji klienckich zawiera:

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-hornetq</artifactId> 
</dependency> 

DemoHornetqClientSendApplication:

@SpringBootApplication 
@EnableScheduling 
public class DemoHornetqClientSendApplication { 
    @Autowired 
    private JmsTemplate jmsTemplate; 
    private @Value("${queue}") String testQueue; 

    public static void main(String[] args) { 
     SpringApplication.run(DemoHornetqClientSendApplication.class, args); 
    } 

    @Scheduled(fixedRate = 5000) 
    private void sendMessage() { 
     String message = "Timestamp from Client: " + System.currentTimeMillis(); 
     System.out.println("Sending message: " + message); 
     jmsTemplate.convertAndSend(testQueue, message); 
    } 
} 

application.properties (ClientSend):

spring.hornetq.mode=native 
spring.hornetq.host=localhost 
spring.hornetq.port=5446 

queue=jms.testqueue 

DemoHornetqClientReceiveApplication:

@SpringBootApplication 
@EnableJms 
public class DemoHornetqClientReceiveApplication { 
    @Autowired 
    private JmsTemplate jmsTemplate; 
    private @Value("${queue}") String testQueue; 

    public static void main(String[] args) { 
     SpringApplication.run(DemoHornetqClientReceiveApplication.class, args); 
    } 

    @JmsListener(destination="${queue}") 
    public void receiveMessage(String message) { 
     System.out.println("Received message: " + message); 
    } 
} 

aplikacji.Właściwości (ClientReceive):

spring.hornetq.mode=native 
spring.hornetq.host=localhost 
spring.hornetq.port=5445 

queue=jms.testqueue 

Teraz konsola pokazuje to:

ServerReveive:

odebrane wiadomości: Datownik z Klientem: 1428574966630
odebrane wiadomości: Datownik z Klientem: 1428574971600
Otrzymano wiadomość: datownik od klienta: 1428574976595

ClientReceive:

odebrane wiadomości: Znacznik czasu z serwerem: 1428574969436
odebrane wiadomości: Znacznik czasu z serwerem: 1428574974438
odebrane wiadomości: Znacznik czasu z serwerem: 1428574979446

Jeśli mam ServerSend działa na chwilę, a następnie uruchom ClientReceive, odbiera także wszystkie wiadomości w kolejce do tego momentu, więc to pokazuje, że wiadomości nie znikają gdzieś lub pobierają je gdzie indziej.

Dla zapewnienia kompletności wskazałem również pozycję ClientSend na ServerSend i ClientReceive na ServerReceive, aby sprawdzić, czy występuje problem z klastrowaniem i klientami InVM, ale ponownie nie było żadnego wskazania, że ​​jakakolwiek wiadomość została odebrana w ClientReceive lub ServerReceive.

Tak więc wydaje się, że dostarczanie wiadomości do/z każdego z brokerów wbudowanych do bezpośrednio podłączonych klientów zewnętrznych działa dobrze, ale żadne wiadomości nie są przekazywane między brokerami w klastrze.

Po tym wszystkim, wielkie pytanie, co jest nie tak z ustawieniem, że wiadomości nie są przekazywane wewnątrz klastra?

+0

Czy znalazłeś odpowiedź na to pytanie w ubiegłym miesiącu? W Internecie (oprócz tego posta) nie znalazłem nic o wiosennej inicjalizacji komunikacji hornetq pomiędzy dwoma procesami ... –

+0

@ deepdownunder2222 jak dotąd nie miałem szczęścia, odniosłem sukces w łączeniu dwóch zewnętrznych brokerów w ten sposób, ale nawet to nie działał w 100% tak jak się spodziewałem, więc zaparkowałem to na chwilę. Udało mi się zrobić to, co było mi potrzebne w ActiveMq, ale jeszcze nie minęło czasu prototypu. To, co opisałem w pytaniu, nie wydaje się być popularną rzeczą, nie było jeszcze dużego zainteresowania. –

+1

Czy zamiast tego tworzyłeś most, architektura wydaje się lepiej dopasowywać do tego, co chcesz osiągnąć: http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/ html/core-bridges.html – grahamrb

Odpowiedz

0

http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/architecture.html#d0e595

„HornetQ rdzeń jest zaprojektowany jako zestaw prostych POJOs więc jeśli masz aplikację, która wymaga funkcjonalność wiadomości wewnętrznie, ale nie chcesz, aby odsłonić że jako serwer HornetQ można bezpośrednio instancji i osadzić Serwery HornetQ we własnej aplikacji. "

Jeśli je umieszczasz, nie ujawniasz go jako serwera. Każdy z twoich kontenerów ma oddzielną instancję. Jest to odpowiednik uruchomienia 2 kopii szerszenia i nadania im tej samej nazwy kolejki. Jeden zapisuje do kolejki w pierwszej instancji, a drugi słucha kolejki w drugiej instancji.

Jeśli chcesz rozdzielić aplikacje w ten sposób, musisz mieć jedno miejsce działające jak serwer. Prawdopodobnie chcesz się skupić. To nie jest specyficzne dla Hornet, BTW. Ten wzór często się pojawia.

+0

Nie jestem przekonany przez tę odpowiedź. 1. Cytowane dokumenty nie stwierdzają wyraźnie, że tego, co próbuję zrobić, nie może być zrobione. 2. Już ujawniam wbudowany serwer dla dostępu zewnętrznego, tj. Łącząc się z nim z zewnętrznego klienta, nie ma z tym problemu. 3. Tak, klastrowanie jest dokładnie tym, czego szukam i właśnie o to pytam, a także dokładnie to, co _nie_ działa –

+0

ci_ faktycznie tworzenie klastrów NIE jest tym, czego szukasz. Klastry są używane w celu zapewnienia nadmiarowości oraz dystrybucji i współdzielenia obciążenia. Próbujesz użyć jednego serwera do wysyłania bezpośrednio do innego serwera. Czy można to zrobić? tak. Czy to jest celem klastrowania? Nie. – pczeus