How can I tell an ACK corresponds to which publish message on MQTT?

2k views Asked by At

I'm struggling with Mqtt paho driver...

I am using IMqttDeliveryToken to get an Acknowledge from the server whenever my publish has been received.

To compare it with the actual publish message, I set up an ID on the MqttMessage in order to retrieve it from the IMqttDeliveryToken... But it doesn't work... The IMqttDeliveryToken.getMessageId() returns an incorrect ID and when I try to get the ID after a IMqttDeliveryToken.getMessage() when the QoS is different from 0, it returns a NPE.

After reading the Javadoc, I read that it's the usual behavior :

Until the message has been delivered, the message being delivered will be returned. Once the message has been delivered null will be returned.

Which lead me to another question... Is the deliveryComplete() method really called after an Broker sent an Acknowledgement ?

Here is my code :

client.setCallback(new MqttCallback() {
    @Override
    public void connectionLost(Throwable thrwbl) { }

    @Override
    public void messageArrived(String string, MqttMessage mm) throws Exception { }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        try {
            System.out.println("Message ID from getMessageId() method : " + token.getMessageId());
            MqttMessage message = token.getMessage();
            System.out.println("Message ID from getMessage() method : " + message.getId());
        } catch (MqttException ex) {
            System.out.println(ex);
        } catch (Exception ex) {
            System.out.println(ex);
        }
    }
});

MqttMessage message = new MqttMessage();
message.setId(76);
message.setPayload("pouet".getBytes());
message.setQos(0);

client.publish("TEST", message);

With QoS to 0 :

Message ID from getMessageId() method : 1
Message ID from getMessage() method : 76

With QoS to 1 :

Message ID from getMessageId() method : 1
java.lang.NullPointerException
2

There are 2 answers

1
Nilu On BEST ANSWER

As mentioned in the git MqttMessage.java

/**
 * This is only to be used internally to provide the MQTT id of a message
 * received from the server.  Has no effect when publishing messages.
 * @param messageId
 */
public void setId(int messageId) {
    this.messageId = messageId;
}

This is is no where used while publishing the message. Now To understand why Message ID from getMessageId() method : 1 happened have a look at following.

public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) throws MqttException,
            MqttPersistenceException {
        final String methodName = "publish";
        //@TRACE 111=< topic={0} message={1}userContext={1} callback={2}
        log.fine(CLASS_NAME,methodName,"111", new Object[] {topic, userContext, callback});

        //Checks if a topic is valid when publishing a message.
        MqttTopic.validate(topic, false/*wildcards NOT allowed*/);

        MqttDeliveryToken token = new MqttDeliveryToken(getClientId());
        token.setActionCallback(callback);
        token.setUserContext(userContext);
        token.setMessage(message);
        token.internalTok.setTopics(new String[] {topic});

        MqttPublish pubMsg = new MqttPublish(topic, message);
        comms.sendNoWait(pubMsg, token);

        //@TRACE 112=<
        log.fine(CLASS_NAME,methodName,"112");

        return token;
    }

MqttDeliveryToken does not set the message id here .While publishing MqttPublish instance is created which internally extends multilevel to MqttWireMessage.java and the value is set to 0 by default.

public MqttWireMessage(byte type) {
        this.type = type;
        // Use zero as the default message ID.  Can't use -1, as that is serialized
        // as 65535, which would be a valid ID.
        this.msgId = 0;
    }

When Final Send is called for mqtt publishing in ClientState.java , MqttWireMessage instance (Internally MqttPublish is send from above code) which had msgId = 0 is forwarded due to which the if condition is true and the getNextMessageId() is called which returns 1 (since it is a first message, else it would have returned subsequent value depending on last msg id) and is set to token which in your code you are tracking in deliveryComplete().

public void send(MqttWireMessage message, MqttToken token) throws MqttException {
        final String methodName = "send";
        if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {
            message.setMessageId(getNextMessageId());
        }
        if (token != null ) {
            try {
                token.internalTok.setMessageID(message.getMessageId());
            } catch (Exception e) {
            }
        }

        /////......
    }
0
Nilu On

To Answer your next query : Is the deliveryComplete() method really called after an Broker sent an Acknowledgement --- YES!!

Delivery completed is called from this little piece of code.

private void handleActionComplete(MqttToken token)
            throws MqttException {
        final String methodName = "handleActionComplete";
        synchronized (token) {
            // @TRACE 705=callback and notify for key={0}
            log.fine(CLASS_NAME, methodName, "705", new Object[] { token.internalTok.getKey() });
            if (token.isComplete()) {
                // Finish by doing any post processing such as delete 
                // from persistent store but only do so if the action
                // is complete
                clientState.notifyComplete(token);
            }

            // Unblock any waiters and if pending complete now set completed
            token.internalTok.notifyComplete();

            if (!token.internalTok.isNotified()) {
                // If a callback is registered and delivery has finished 
                // call delivery complete callback. 
                if ( mqttCallback != null 
                    && token instanceof MqttDeliveryToken 
                    && token.isComplete()) {
                        mqttCallback.deliveryComplete((MqttDeliveryToken) token);
                }
                // Now call async action completion callbacks
                fireActionEvent(token);
            }

            // Set notified so we don't tell the user again about this action.
            if ( token.isComplete() ){
               if ( token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener ) {
                    token.internalTok.setNotified(true);
                }
            }



        }
    }

i.e once the ack is received i.e notifyComplete is done, flags are set, the deliveryComplete method is called.