Badam rozwiązania kolejkowania dla jednej z aplikacji mojego zespołu. Idealnie byłoby coś, co można skonfigurować zarówno jako lekkiego, pośredniczącego w procesie brokera (do przesyłania komunikatów o małej przepustowości między wątkami), jak i jako zewnętrznego brokera. Czy istnieje serwer MQ, który może to zrobić? Większość wydaje się wymagać konfiguracji jako zewnętrznego podmiotu. Wydaje się, że ZeroMQ jest najbliższy rozwiązaniu procesowemu, ale wydaje się być raczej "gniazdem UDP na sterydach" i potrzebujemy niezawodnej dostawy.Czy istnieją serwery MQ, które mogą być osadzone w procesie Java?
Odpowiedz
Tak jak powiedzieliśmy, ActiveMQ
jest nieco cięższy niż ZeroMQ
, ale działa bardzo dobrze jako proces wbudowany. Oto prosty przykład z Spring
i ActiveMQ
.
słuchacza wiadomość, która będzie używana w celu przetestowania kolejki: konfigurację
public class TestMessageListener implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(TestMessageListener.class);
@Override
public void onMessage(Message message) {
/* Receive the text message */
if (message instanceof TextMessage) {
try {
String text = ((TextMessage) message).getText();
System.out.println("Message reception from the JMS queue : " + text);
} catch (JMSException e) {
logger.error("Error : " + e.getMessage());
}
} else {
/* Handle non text message */
}
}
}
ActiveMQ
kontekstowego:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="jmsQueueConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61617</value>
</property>
</bean>
<bean id="pooledJmsQueueConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="jmsQueueConnectionFactory" />
</bean>
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="messageQueue" />
</bean>
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="pooledJmsQueueConnectionFactory" />
<property name="pubSubDomain" value="false"/>
</bean>
<bean id="testMessageListener" class="com.example.jms.TestMessageListener" />
<bean id="messageQueuelistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="pooledJmsQueueConnectionFactory" />
<property name="destination" ref="QueueDestination" />
<property name="messageListener" ref="testMessageListener" />
<property name="concurrentConsumers" value="5" />
<property name="acceptMessagesWhileStopping" value="false" />
<property name="recoveryInterval" value="10000" />
<property name="cacheLevelName" value="CACHE_CONSUMER" />
</bean>
</beans>
Test JUnit
:
@ContextConfiguration(locations = {"classpath:/activeMQ-context.xml"})
public class SpringActiveMQTest extends AbstractJUnit4SpringContextTests {
@Autowired
private JmsTemplate template;
@Autowired
private ActiveMQDestination destination;
@Test
public void testJMSFactory() {
/* sending a message */
template.convertAndSend(destination, "Hi");
/* receiving a message */
Object msg = template.receive(destination);
if (msg instanceof TextMessage) {
try {
System.out.println(((TextMessage) msg).getText());
} catch (JMSException e) {
System.out.println("Error : " + e.getMessage());
}
}
}
}
zależności do dodania the pom.xml
:
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${org.springframework-version}</version>
</dependency>
<!-- ActiveMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.6.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.6.0</version>
</dependency>
To się udało, dziękuję. –
Problemy, które widzieliśmy z ActiveMQ (i dlaczego obecnie szukam alternatywy): (a) po zamknięciu nie zatrzymuje wszystkich wątków. (b) wiele z jego API uruchamia usługi w trakcie budowy, mając jednocześnie metody podobne do start(), przez co interfejs API jest niejasny i prawdopodobnie przyczynia się do (a). (c) wiele problemów z wielowątkowością, które do tej pory zostały "naprawione" przez dodanie kolejnych zsynchronizowanych bloków. (d) wątpliwe zabezpieczenia, szczególnie gdy są używane w trybie wykrywania. (Pro-tip: skonfiguruj nieuczciwy serwer kolejki wiadomości na czas zabawy w biurze!) – Trejkaz
Klient WebSphere MQ może wykonywać multicast pub/sub. Zapewnia to zdolność klienta do klienta, która omija menedżera kolejek, chociaż do nawiązywania połączenia wymagany jest menedżer kolejek.
Myślę, że odpowiedzi na http://stackoverflow.com/questions/2507536/lightweight-jms-broker zawierają interesujące informacje (np. [Ffmq] (http://ffmq.sourceforge.net/index) .html) sugestia). ActiveMQ jest kolejnym, aczkolwiek cięższym kandydatem, ale można go również osadzać. – fvu
Jak powiedział @fvu, ActiveMQ jest nieco cięższy niż ZeroMQ, ale działa bardzo dobrze jako proces osadzony. A jeśli używasz Springa, to bardzo łatwo go skonfigurować. –
ZeroMQ działa między innymi nad TCP (nie UDP), który zapewnia niezawodny transport. Czy jednak odnosisz się do trwałej kolejki? To znaczy. z kopią na dysk? –