2013-08-12 22 views
7

Mamy do czynienia z przypadkowym problemem z ActiveMQ i jego konsumentami. Zauważamy, że niewielu użytkowników nie otrzymuje wiadomości, mimo że są oni podłączeni do kolejki ActiveMQ. Ale działa dobrze po ponownym uruchomieniu klienta.Konsument nie otrzymuje wiadomości od ActiveMQ

Mamy kolejkę o nazwie testQueue po stronie ActiveMQ. Konsument próbuje usunąć kolejkowane wiadomości z tej kolejki. W tym celu używamy Spring DefaultMessageListenerContainer. Wiadomość jest dostarczana do węzła konsumenta z brokera ActiveMQ. Z tcpdump również było oczywiste, że wiadomość dociera do węzła konsumenta, ale faktyczny kod konsumenta nie jest w stanie zobaczyć komunikatu. Innymi słowy, wiadomość wydaje się być zablokowana w kodzie konsumenta ActiveMQ lub w SpringManageMessageListenerContainer.

Zobacz ilustrację poniżej. dla większej jasności w tej kwestii. Wiadomość dociera do węzła konsumenta, ale nie dociera do "rzeczywistej klasy konsumenckiej", co oznacza, że ​​wiadomość utknęła w kodzie konsumenta AMQ lub w źródłowym DMLC.

enter image description here

Poniżej znajdują się szczegóły uchwycone z ActiveMQ administratora.

Queue-Name/Oczekuje-Message-Count/Consumer-Count/Wiadomości/Wiadomości-skolejkowany-rozkolejkowywana testQueue/9/1/9/0

Poniżej znajduje się więcej szczegółów.

połączenie ID/SessionID/selektor/kolejkuje/Dequeues/sprawdź/sprawdź kolejką/Prefetch ID: bearsvir52-45176-1375519181268-3: 5/1// 9/0/9/9/250

Z drugiej tabeli wiadomo, że wiadomości są dostarczane konsumentowi, ale konsument nie potwierdza wiadomości. W związku z tym wiadomości są zablokowane w Wywoływaniu kolejki po stronie brokera.

kilka punktów do powiadomienia dla:

1) Nie ma różnicy czasu b/w węźle Broker i węzła konsumentów.

2) Obserwowano tcpdump po stronie konsumenta. Widzimy, że pakiet MessageDispatch (Openwire) jest przenoszony do węzła konsumenta, ale nie mógł znaleźć tego samego komunikatu MessageAck (Openwire).

3) Czasami działa na węźle, a czasami tworzy problem na tym samym węźle.

+0

będzie można wysłać maksymalnie config sprężyny, która pokazuje ConectionFactory, DMLC i klasę słuchacza? –

+0

Mam dokładnie ten sam problem. Czy masz rozdzielczość? –

+0

Dowolna aktualizacja? Mam podobny problem, – Nereis

Odpowiedz

2

Zajęło dużo czasu, aby znaleźć rozwiązanie. Wydaje się, że występuje problem z klasą org.apache.activemq.ActiveMQConnection.java, w przypadku niepowodzenia AMQ. Obiekt połączenia nie jest w takich przypadkach uruchamiany po stronie konsumenta.

Poniżej znajduje się poprawka, którą dodałem w pliku ActiveMQConnection.java i skompilowałem źródła, aby utworzyć activemq-core-x.x.x.jar

private final Object startMutex = new Object(); 

dodany czek metody createSession

public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 
    synchronized (startMutex) { 
     if(!isStarted()) { 
      start(); 
     } 
    } 
1

Jedną z przyczyn może być niepoprawne użycie urządzenia CachingConnectionFactory (z konsumentami buforowanymi) z kontenerem słuchacza, który dynamicznie dostosowuje konsumentów (maksymalna liczba konsumentów> konsumentów). Możesz skończyć z konsumentem z pamięci podręcznej, który siedzi w basenie i nie jest aktywnie wykorzystywany. Nigdy nie musisz buforować klientów za pomocą kontenera nasłuchującego.

W przypadku takich problemów, generalnie zaleca się uruchamianie z rejestrowaniem TRACE i można zobaczyć całą aktywność konsumenta.