2015-07-01 10 views
6

Próbuję zaimplementować niektóre funkcje na kliencie MQTT w Javie z Eclipse Paho. Celem jest zasubskrybowanie tematu, a po odebraniu wiadomości klient wysyła kolejną wiadomość na inny temat.Jak opublikować wiadomość podczas odbierania na kliencie MQTT Java za pomocą Eclipse Paho

To wygląda bardzo prosto, ale mam dziwny problem, którego nie mogę rozwiązać. Oto mój kod:

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.IMqttToken; 
import org.eclipse.paho.client.mqttv3.MqttCallback; 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 

public class MqttOperations implements MqttCallback { 

    MqttClient sampleClient; 
    MqttConnectOptions connOpts; 

    public MqttOperations() { 
    } 

    public static void main(String[] args) throws InterruptedException { 
     new MqttOperations().launchMqttClient(); 
    } 


    public void launchMqttClient() throws InterruptedException { 
     try { 
       MemoryPersistence persistence = new MemoryPersistence(); 
       sampleClient = new MqttClient("tcp://broker.mqttdashboard.com:1883", "iamaclient", persistence); 
       connOpts = new MqttConnectOptions(); 
       connOpts.setCleanSession(true); 
       sampleClient.connect(connOpts); 
       sampleClient.subscribe("topic/example/ofmessage"); 
       sampleClient.setCallback(this); 

      } catch(MqttException me) { 
       System.out.println("reason "+me.getReasonCode()); 
       System.out.println("msg "+me.getMessage()); 
       System.out.println("loc "+me.getLocalizedMessage()); 
       System.out.println("cause "+me.getCause()); 
       System.out.println("excep "+me); 
       me.printStackTrace(); 
      } 
    } 


    @Override 
    public void connectionLost(Throwable cause) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void messageArrived(String topic, MqttMessage message) throws MqttException 
    { 
     System.out.println("Received: " + message.toString()); 
     try{ 
      System.out.println("Publishing message: i am the answer"); 
      MqttMessage ans = new MqttMessage("i am the answer".getBytes()); 
      ans.setQos(2); 
      sampleClient.publish("topic/example/ofanswer", ans); 
      System.out.println("Message published"); 

     }catch(MqttException me){ 
       System.out.println("reason "+me.getReasonCode()); 
       System.out.println("msg "+me.getMessage()); 
       System.out.println("loc "+me.getLocalizedMessage()); 
       System.out.println("cause "+me.getCause()); 
       System.out.println("excep "+me); 
       me.printStackTrace(); 
     } 

    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 

    } 

} 

Chodzi o to, że ten program działa tylko raz. Po odebraniu wiadomości odpowiedź na tę wiadomość jest wysyłana, ale wydaje się, że komunikat "opublikowana wiadomość" nigdy nie jest wyświetlany na ekranie, a klient nie otrzymuje żadnych innych wiadomości. Mam wrażenie, że linia sampleClient.publish("topic/example/ofanswer", ans); nigdy nie kończy wykonywania. Czy ktoś wie, jak to się dzieje i jak rozwiązać mój problem?

+0

Kolejna precyzja: Znalazłem kilka źródeł, w których wyjaśniono, że powinienem uważać, aby nie reagować na moją własną odpowiedź, w przeciwnym razie nie będzie działać w sposób oczywisty. Ale myślę, że nie jestem zaniepokojony tym problemem, ponieważ tematy, których używam do subskrybowania i publikowania odpowiedzi, są różne – tben

+0

Myślę, że to problem, który blokujesz w wywołaniu zwrotnym messageArrived. Czy możesz spróbować opublikować w innym wątku (np. Użyć Executora i przesłać polecenie publikowania w wywołaniu zwrotnym messageArrived)? –

Odpowiedz

2

Miałem podobny problem dzisiaj. Po przeczytaniu an other question with two connections otrzymałem: Potrzebne są dwie instancje MqttClient. Jeden do publikacji i jeden do subskrybowania. Niestety nie znalazłem dokumentacji dla tego faktu.

Przy okazji. W mojej pierwszej implementacji z dwoma klientami dałem im te same identyfikatory (logicznie powinno to być to samo połączenie). Ale drugie połączenie rozłącza pierwsze. Kiedy zacząłem używać dwóch różnych identyfikatorów, zaczyna działać.

+0

Naprawdę mi pomogło. :) –

+0

To przeszkadza mi, jeśli czytasz dokument http://www.eclipse.org/paho/files/javadoc/index.html dla messageArrived(), ponieważ nie powinieneś mieć dwóch połączeń, jednak działa tak okrzyki dla ciebie! – Clocker

0

Dominik Obermaier ma rację: problem polega na tym, że blokujesz w messageArrived. Konkretnie, MqttClient.publish czeka aż zostanie odebrane powiadomienie o dostarczeniu wiadomości - ale wątek roboczy MqttClient nigdy go nie odzyska, ponieważ siedzi w oczekiwaniu na samo powiadomienie w messageArrived!

Rozwiązanie dla dwóch klientów działa, ponieważ wątek wątku innego klienta może pobrać powiadomienie z gniazda, ale właściwym rozwiązaniem jest opublikowanie z QoS 0 z poziomu messageArrived (ponieważ komunikaty QoS 0 nie wymagają potwierdzenia dostarczenia;) lub użyj interfejsu API, który nie czeka na dostarczenie wiadomości, takiego jak MqttTopic.publish.