Próbuję zaimplementować niektóre funkcje na kliencie MQTT w Javie z Eclipse Paho. Celem jest zasubskrybowanie tematu, a po odebraniu wiadomości klient wysyła kolejną wiadomość na inny temat.Jak opublikować wiadomość podczas odbierania na kliencie MQTT Java za pomocą Eclipse Paho
To wygląda bardzo prosto, ale mam dziwny problem, którego nie mogę rozwiązać. Oto mój kod:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttOperations implements MqttCallback {
MqttClient sampleClient;
MqttConnectOptions connOpts;
public MqttOperations() {
}
public static void main(String[] args) throws InterruptedException {
new MqttOperations().launchMqttClient();
}
public void launchMqttClient() throws InterruptedException {
try {
MemoryPersistence persistence = new MemoryPersistence();
sampleClient = new MqttClient("tcp://broker.mqttdashboard.com:1883", "iamaclient", persistence);
connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
sampleClient.connect(connOpts);
sampleClient.subscribe("topic/example/ofmessage");
sampleClient.setCallback(this);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException
{
System.out.println("Received: " + message.toString());
try{
System.out.println("Publishing message: i am the answer");
MqttMessage ans = new MqttMessage("i am the answer".getBytes());
ans.setQos(2);
sampleClient.publish("topic/example/ofanswer", ans);
System.out.println("Message published");
}catch(MqttException me){
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
Chodzi o to, że ten program działa tylko raz. Po odebraniu wiadomości odpowiedź na tę wiadomość jest wysyłana, ale wydaje się, że komunikat "opublikowana wiadomość" nigdy nie jest wyświetlany na ekranie, a klient nie otrzymuje żadnych innych wiadomości. Mam wrażenie, że linia sampleClient.publish("topic/example/ofanswer", ans);
nigdy nie kończy wykonywania. Czy ktoś wie, jak to się dzieje i jak rozwiązać mój problem?
Kolejna precyzja: Znalazłem kilka źródeł, w których wyjaśniono, że powinienem uważać, aby nie reagować na moją własną odpowiedź, w przeciwnym razie nie będzie działać w sposób oczywisty. Ale myślę, że nie jestem zaniepokojony tym problemem, ponieważ tematy, których używam do subskrybowania i publikowania odpowiedzi, są różne – tben
Myślę, że to problem, który blokujesz w wywołaniu zwrotnym messageArrived. Czy możesz spróbować opublikować w innym wątku (np. Użyć Executora i przesłać polecenie publikowania w wywołaniu zwrotnym messageArrived)? –