Próbuję utworzyć statyczny klaster dwóch aplikacji Spring Boot z osadzonymi serwerami HornetQ. Jedna aplikacja/serwer będzie obsługiwać zdarzenia zewnętrzne i generować komunikaty, które będą wysyłane do kolejki komunikatów. Druga aplikacja/serwer będzie nasłuchiwać w kolejce komunikatów i przetwarzać wiadomości przychodzące. Ponieważ połączenie między tymi dwoma aplikacjami jest niewiarygodne, każdy będzie używał tylko klientów lokalnych/wVM do generowania/odbierania komunikatów na swoim serwerze i polegając na funkcji grupowania, aby przekazywać wiadomości do kolejki na innym serwerze w klastrze.Spring Boot osadzony klaster HornetQ nie przekazuje wiadomości
Używam HornetQConfigurationCustomizer
do dostosowania osadzonego serwera HornetQ, ponieważ domyślnie jest dostarczany tylko z InVMConnectorFactory
.
Stworzyłem kilka przykładowych aplikacji, które ilustrują tę konfigurację, w tym przykładzie "ServerSend", odnosi się do serwera, który będzie produkować wiadomości, a "ServerReceive" odnosi się do serwera, który będzie zużywać wiadomości.
pom.xml do zastosowań należą:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-jms-server</artifactId>
</dependency>
DemoHornetqServerSendApplication:
@SpringBootApplication
@EnableScheduling
public class DemoHornetqServerSendApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${spring.hornetq.embedded.queues}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqServerSendApplication.class, args);
}
@Scheduled(fixedRate = 5000)
private void sendMessage() {
String message = "Timestamp from Server: " + System.currentTimeMillis();
System.out.println("Sending message: " + message);
jmsTemplate.convertAndSend(testQueue, message);
}
@Bean
public HornetQConfigurationCustomizer hornetCustomizer() {
return new HornetQConfigurationCustomizer() {
@Override
public void customize(Configuration configuration) {
String serverSendConnectorName = "server-send-connector";
String serverReceiveConnectorName = "server-receive-connector";
Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5445");
TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverSendConnectorName, tc);
Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
acceptors.add(tc);
params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5446");
tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverReceiveConnectorName, tc);
List<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(serverReceiveConnectorName);
ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
"my-cluster", // name
"jms", // address
serverSendConnectorName, // connector name
500, // retry interval
true, // duplicate detection
true, // forward when no consumers
1, // max hops
1000000, // confirmation window size
staticConnectors,
true // allow direct connections only
);
configuration.getClusterConfigurations().add(conf);
AddressSettings setting = new AddressSettings();
setting.setRedistributionDelay(0);
configuration.getAddressesSettings().put("#", setting);
}
};
}
}
application.properties (ServerSend):
spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password
DemoHornetqServerReceiveApplication:
@SpringBootApplication
@EnableJms
public class DemoHornetqServerReceiveApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${spring.hornetq.embedded.queues}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqServerReceiveApplication.class, args);
}
@JmsListener(destination="${spring.hornetq.embedded.queues}")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
@Bean
public HornetQConfigurationCustomizer hornetCustomizer() {
return new HornetQConfigurationCustomizer() {
@Override
public void customize(Configuration configuration) {
String serverSendConnectorName = "server-send-connector";
String serverReceiveConnectorName = "server-receive-connector";
Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5446");
TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverReceiveConnectorName, tc);
Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
acceptors.add(tc);
params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5445");
tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverSendConnectorName, tc);
List<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(serverSendConnectorName);
ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
"my-cluster", // name
"jms", // address
serverReceiveConnectorName, // connector name
500, // retry interval
true, // duplicate detection
true, // forward when no consumers
1, // max hops
1000000, // confirmation window size
staticConnectors,
true // allow direct connections only
);
configuration.getClusterConfigurations().add(conf);
AddressSettings setting = new AddressSettings();
setting.setRedistributionDelay(0);
configuration.getAddressesSettings().put("#", setting);
}
};
}
}
application.properties (ServerReceive):
spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password
Po uruchomieniu aplikacji zarówno zalogować wyjście to pokazuje:
ServerSend:
2015-04-09 11:11: 58.471 INFO 7536 --- [główny] org.hornetq.core.server: HQ221000: serwer na żywo zaczyna się konfiguracją Konfiguracja HornetQ (klaster = true, backup = false, sharedStore = true, journalDirectory = C: \ Users **** \ AppData \ Loca l \ Temp \ hornetq-data/journal, bindingsDirectory = data/bindingings, largeMessagesDirectory = data/largemessages, pagingDirectory = data/paging)
2015-04-09 11: 11: 58.501 INFO 7536 --- [główny] org. hornetq.core.server: HQ221045: libaio nie jest dostępny, przełącza konfigurację na NIO
2015-04-09 11: 11: 58.595 INFO 7536 --- [główny] org.hornetq.core.server: HQ221043: Dodawanie protokołu obsługa CORE
2015-04-09 11: 11: 58.720 INFO 7536 --- [główny] org.hornetq.core.server: HQ221003: próba wdrożenia kolejki jms.queue.jms.testqueue
2015-04-09 11: 11: 59.568 INFO 7536 --- [główny] org.hornetq.core.server: HQ221020: Uruchomiony Netty Acceptor w wersji 4.0.13. Główny localhost: 5445
2015-04-09 11: 1 1: 59.593 INFO 7536 --- [główny] org.hornetq.core.server: HQ221007: Serwer jest teraz dostępny na żywo
2015-04-09 11: 11: 59.593 INFO 7536 --- [główny] org.hornetq.core .server: HQ221001: HornetQ Server wersja 2.4.5.KOŃCOWA (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]
ServerReceive:
2015-04-09 11: 12: 04,401 INFORMACJE 4528 --- [głównym] org.hornetq.core.server: HQ221000: serwer na żywo zaczyna się konfiguracją Konfiguracja HornetQ (clustered = true, backup = false, sharedStore = true, journalDirectory = C: \ Users **** \ AppData \ Local \ Temp \ hornetq- data/journal, bindingsDirectory = data/bindingings, largeMessagesDirectory = data/largemessages, pagingDirectory = data/paging)
2015-04-09 11: 12: 04.410 INFO 4528 --- [główne] org.hornetq.core.server: HQ221045: libaio nie jest dostępny, przełączanie konfiguracja do NIO
2015-04-09 11: 12: 04.520 INFO 4528 --- [główny] org.hornetq.core.server: HQ221043: Dodanie obsługi protokołu CORE
2015-04-09 11: 12: 04.629 INFO 4528 --- [główny] org.hornetq.core.server: HQ221003: próba wdrożenia kolejki jms.queue.jms.testqueue
2015-04-09 11: 12: 05.545 INFO 4528 --- [główny] org. hornetq.core.server: HQ221020: Rozpoczęty Netty Acceptor w wersji 4.0.13. Główny localhost: 5446
2015-04-09 11: 12: 05.578 INFO 4528 --- [główny] org.hornetq.core.server: HQ221007: Serwer jest teraz dostępny na żywo
2015-04-09 11: 12: 05.578 INFO 4528 --- [główny] org.hornetq.core.server: HQ221001: Serwer HornetQ wersja 2.4.5.FINAL (Wild Hornet, 124) [c139929d -d90f-11e4-ba2e -e58abf5d6944]
widzę clustered=true
obu wyjściach a to pokazują false
jeśli usuwa konfigurację klastra z HornetQConfigurationCustomizer
, a więc musi mieć pewien wpływ.
Teraz ServerSend pokazuje to na wyjściu konsoli:
wysyłanie wiadomości: Znacznik czasu z serwerem: 1428574324910
wysyłanie wiadomości: Znacznik czasu z serwerem: 1428574329899
wysyłanie wiadomości: Znacznik czasu z serwerem: 1428574334904
Jednak ServerReceive nic nie pokazuje.
Wygląda na to, że wiadomości nie są przekazywane z usługi ServerSend do ServerReceive.
Zrobiłem więcej testów, tworząc dwie dodatkowe aplikacje Spring Boot (ClientSend i ClientReceive), które mają , a nie osadzony serwer HornetQ i zamiast tego łączą się z "rodzimym" serwerem.
pom.xml zarówno dla aplikacji klienckich zawiera:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
DemoHornetqClientSendApplication:
@SpringBootApplication
@EnableScheduling
public class DemoHornetqClientSendApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${queue}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqClientSendApplication.class, args);
}
@Scheduled(fixedRate = 5000)
private void sendMessage() {
String message = "Timestamp from Client: " + System.currentTimeMillis();
System.out.println("Sending message: " + message);
jmsTemplate.convertAndSend(testQueue, message);
}
}
application.properties (ClientSend):
spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5446
queue=jms.testqueue
DemoHornetqClientReceiveApplication:
@SpringBootApplication
@EnableJms
public class DemoHornetqClientReceiveApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${queue}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqClientReceiveApplication.class, args);
}
@JmsListener(destination="${queue}")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
aplikacji.Właściwości (ClientReceive):
spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5445
queue=jms.testqueue
Teraz konsola pokazuje to:
ServerReveive:
odebrane wiadomości: Datownik z Klientem: 1428574966630
odebrane wiadomości: Datownik z Klientem: 1428574971600
Otrzymano wiadomość: datownik od klienta: 1428574976595
ClientReceive:
odebrane wiadomości: Znacznik czasu z serwerem: 1428574969436
odebrane wiadomości: Znacznik czasu z serwerem: 1428574974438
odebrane wiadomości: Znacznik czasu z serwerem: 1428574979446
Jeśli mam ServerSend działa na chwilę, a następnie uruchom ClientReceive, odbiera także wszystkie wiadomości w kolejce do tego momentu, więc to pokazuje, że wiadomości nie znikają gdzieś lub pobierają je gdzie indziej.
Dla zapewnienia kompletności wskazałem również pozycję ClientSend na ServerSend i ClientReceive na ServerReceive, aby sprawdzić, czy występuje problem z klastrowaniem i klientami InVM, ale ponownie nie było żadnego wskazania, że jakakolwiek wiadomość została odebrana w ClientReceive lub ServerReceive.
Tak więc wydaje się, że dostarczanie wiadomości do/z każdego z brokerów wbudowanych do bezpośrednio podłączonych klientów zewnętrznych działa dobrze, ale żadne wiadomości nie są przekazywane między brokerami w klastrze.
Po tym wszystkim, wielkie pytanie, co jest nie tak z ustawieniem, że wiadomości nie są przekazywane wewnątrz klastra?
Czy znalazłeś odpowiedź na to pytanie w ubiegłym miesiącu? W Internecie (oprócz tego posta) nie znalazłem nic o wiosennej inicjalizacji komunikacji hornetq pomiędzy dwoma procesami ... –
@ deepdownunder2222 jak dotąd nie miałem szczęścia, odniosłem sukces w łączeniu dwóch zewnętrznych brokerów w ten sposób, ale nawet to nie działał w 100% tak jak się spodziewałem, więc zaparkowałem to na chwilę. Udało mi się zrobić to, co było mi potrzebne w ActiveMq, ale jeszcze nie minęło czasu prototypu. To, co opisałem w pytaniu, nie wydaje się być popularną rzeczą, nie było jeszcze dużego zainteresowania. –
Czy zamiast tego tworzyłeś most, architektura wydaje się lepiej dopasowywać do tego, co chcesz osiągnąć: http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/ html/core-bridges.html – grahamrb