Paho MQTT cleanSession set to false yet not receiving messages

4.3k views Asked by At

I was testing MQTT for a project. I am also able to receive messages on a topic to which my client had subscribed when the client is connected. I have set QoS to 1 and cleanSession is set to false. But I am unable to receive messages which were sent to the subscribed topic when my client connects again. In my application almost all the work is done by a helper service.

Here is my code

AndroidManifest.xml

<?xml version="1.0" encoding="utf-8"?>

<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

<application
    android:allowBackup="true"
    android:icon="@mipmap/ic_launcher"
    android:label="@string/app_name"
    android:theme="@style/AppTheme" >
    <activity
        android:name=".MainActivity"
        android:label="@string/app_name"
        android:screenOrientation="portrait" >
        <intent-filter>
            <action android:name="android.intent.action.MAIN" />

            <category android:name="android.intent.category.LAUNCHER" />
        </intent-filter>
    </activity>

    <service
        android:name=".MqttHelperService"
        android:enabled="true"
        android:exported="true" />

    <!-- MqttService -->
    <service android:name="org.eclipse.paho.android.service.MqttService" />
</application>

MainActivity.java

package com.prateek.mqtttest;

import android.app.Activity;
import android.content.Intent;
import android.os.Bundle;

public class MainActivity extends Activity {
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        startService(new Intent(getBaseContext(), MqttHelperService.class));
    }
}

MqttHelperService.java

package com.prateek.mqtttest;

import android.app.Service;
import android.content.Intent;
import android.os.Binder;
import android.os.IBinder;
import android.widget.Toast;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttHelperService extends Service implements MqttCallback {

    private static final String MQTT_URI = "tcp://broker.mqttdashboard.com:1883";
    private static final String CLIENT_ID = "prateek";
    private static final String MQTT_TOPIC = "mqttmessenger";
    private static final int QOS = 1;
    private MqttAndroidClient client;

    public MqttHelperService() {
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        Toast.makeText(this, "MQTT Helper Service Started", Toast.LENGTH_SHORT).show();
        new Thread(new Runnable() {
            @Override
            public void run() {
                connect();
            }
        }, "MqttHelperService").start();
        return START_STICKY;
    }

    public class MqttHelperBinder extends Binder {
        public MqttHelperService getService(){
            return MqttHelperService.this;
        }
    }

    public void connect() {
        client = new MqttAndroidClient(this, MQTT_URI, CLIENT_ID);
        client.setCallback(this);

        try {
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(false);
            client.connect(options, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken iMqttToken) {
                    Toast.makeText(getBaseContext(), "connected to MQTT broker", Toast.LENGTH_SHORT).show();
                    subscribe();
                }

                @Override
                public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
                    Toast.makeText(getBaseContext(), "failed to connect: " + throwable.getMessage(), Toast.LENGTH_SHORT).show();
                }
            });
        } catch (MqttException e) {
            Toast.makeText(this, "could not connect to MQTT broker at " + MQTT_URI, Toast.LENGTH_SHORT).show();
        }
    }

    public void subscribe() {
        try {
            IMqttToken token = client.subscribe(MQTT_TOPIC, QOS, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken iMqttToken) {
                    Toast.makeText(getBaseContext(), "subscription successful", Toast.LENGTH_SHORT).show();
                }

                @Override
                public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
                    Toast.makeText(getBaseContext(), "subscription failed: " + throwable, Toast.LENGTH_SHORT).show();
                }
            });

        } catch (MqttException e) {
            Toast.makeText(this, "could not subscribe", Toast.LENGTH_SHORT).show();
        }
    }

    @Override
    public IBinder onBind(Intent intent) {
        // TODO: Return the communication channel to the service.
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override
    public void connectionLost(Throwable throwable) {
        Toast.makeText(this, "connection lost", Toast.LENGTH_SHORT).show();
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        Toast.makeText(this, "message received on topic " + s, Toast.LENGTH_SHORT).show();
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        Toast.makeText(this, "Service Destroyed", Toast.LENGTH_SHORT).show();
    }


}

I even checked this link With the clear session flag set to FALSE, I am missing the published values but could not find the error in my code

2

There are 2 answers

3
wyzard On BEST ANSWER

I've had a same problem recently. Now I think the solution is very simple, but I've spent many many hours to figure it out.

This line was 'bad':

client.connect(mqttOptions, mqqtActionListener);

The 'correct' is:

client.connect(mqttOptions, null, mqqtActionListener);

If you call the connect method with 2 parameters, you are using this constructor:

public IMqttToken connect(Object userContext, IMqttActionListener callback) throws MqttException

Instead of the right one:

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException

I hope that is your problem too.

2
ralight On

The most likely problem is that the messages being published were QoS=0. Both the subscription and the publish must be QoS>0 in order to be queued for a durable client.

You have confirmed that the messages are published with QoS 1, so the next thing to do is test with known working tools, to eliminate possible problems with your code. I tried:

mosquitto_sub -i prateek -t mqttmessenger -h broker.mqttdashboard.com -v -d -c -q 1

And verified that I could receive messages whilst it was connected:

mosquitto_pub -q 1 -t mqttmessenger -m hello2 -h broker.mqttdashboard.com

I then disconnected the mosquitto_sub client and published another message with the same mosquitto_pub command. Reconnecting mosquitto_sub with the same command produced no message, exactly as you see. Repeating the procedure but using test.mosquitto.org as the broker, the behaviour is as expected. It looks like broker.mqttdashboard.com is not configured to allow durable clients.