paho-mqtt won't give me the queued messages

1.7k views Asked by At

Using paho-mqtt and trying to have it receive queued messages. The broker I'm using is emqx 4.2.2 and this is my script:

from paho.mqtt.client import Client, MQTTv5


def on_connect(mqttc, obj, flags, rc, other):
    print("    Session present: " + str(flags['session present']))
    print("    Connection result: " + str(rc))
    mqttc.subscribe([
        ('/message/1', 1)
    ])


def on_message(*args, **kwargs):
    print("received a message")


client = Client(
    client_id='test-client-id',
    protocol=MQTTv5,
)


client.username_pw_set(
    username="test-user-2",
    password="test"
)

client.on_connect = on_connect
client.on_message = on_message

client.connect(
    host='localhost',
    port=1883,
    keepalive=60,
    clean_start=False,
)

client.loop_forever()

I now go and publish a message to the broker:

mosquitto_pub -u test-user-2 -P test -t '/message/1' -m 'this is a message' -q 1 -V mqttv5

While the client is connected to the broker, It does receive the messages but given that I'm subscribing with QoS 1 and that messages are published with QoS 1 I am expecting that if I disconnect my client from the broker, then publish some more QoS 1 messages to that topic and then reconnect my client to the broker using the same fixed client_id, then my client will receive the messages that have been queued while my client was away. Well that's not happening and simulating the same functionality with mosquitto_sub with the -c flag everything works as expected, which leads me to ask myself ... is there a problem with paho-mqtt? Am I doing something wrong?

3

There are 3 answers

0
Liviu On BEST ANSWER

In MQTT v5, clean start means only whether the session is deleted at the start or not. To control how long the session lasts after you disconnect, you need to set the session expiry interval:

import paho.mqtt.properties as properties
...
connect_properties = properties.Properties(properties.PacketTypes.CONNECT)
connect_properties.SessionExpiryInterval = 3600

client.connect("localhost", 1883, 60, properties=connect_properties)
0
hardillb On

It should be clean_session not clean_start and it needs to be passed to the Client constructor not the connect() function:

...
client = Client(
    client_id='test-client-id',
    protocol=MQTTv5,
    clean_session=False
)


client.username_pw_set(
    username="test-user-2",
    password="test"
)

client.on_connect = on_connect
client.on_message = on_message

client.connect(
    host='localhost',
    port=1883,
    keepalive=60
)
...
1
JD Allen On

QOS only works if the client is SUBSCRIBEd...if you want to receive messages before you are Subscribed, you need to PUBLISH messages with the Retain flag set...and then you only get the last message that was sent. If you really need to get multiple messages, then you need to use something like AMQ, and not MQTT.