How to subscribe mqttClient for particular duration while reading mqtt messages

955 views Asked by At

I am new to MQTT and I have some questions that I hope you guys could help me with. I'm working on a project that will require me to utilize the MQTT protocol and the program needs to be written in java(Just some background info)

Can a MQTT client subscribe for particular time interval? I need to read mqtt messages using eclipse paho client mqttv3 and subscribe to a particular topic for certain duration (e.g. 15 minutes)and read those mqtt messages. Please find below the code which I have tried .

private void initializeConnectionOptions() {
    try {
        mqttConnectOptions.setCleanSession(false);
        mqttConnectOptions.setAutomaticReconnect(false);
        mqttConnectOptions.setSocketFactory(SslUtil.getSocketFactory(this.caCrt, this.clientCrt, this.clientKey));
        mqttConnectOptions.setKeepAliveInterval(300);
        mqttConnectOptions.setConnectionTimeout(300);
        mqttClient = new MqttClient("ssl://IP:port", "clientID", memoryPersistence);
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {

            }
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                String attribute = "Attribute";
                JSONObject json = new JSONObject(message.toString());
                LOGGER.info("json value is "+ json.toString());
                if (json.toString().contains(attribute)) {
                    int value = json.getInt(attribute);
                    Long sourceTimestamp = json.getLong("sourceTimestamp");
                    String deviceName = json.getString("deviceName");
                    String deviceType = json.getString("deviceType");
                    if (!nodeValueWithDevice.containsKey(deviceName)) {
                        List<Integer> attributeValue = new ArrayList<Integer>();
                        if (!attributeValue.contains(value)) {
                            attributeValue.add(value);
                        }
                        nodeValueWithDevice.put(deviceName, attributeValue);
                    } else {
                        List<Integer> temList = nodeValueWithDevice.get(deviceName);
                        if (!temList.contains(value)) {
                            temList.add(value);
                        }
                        nodeValueWithDevice.put(deviceName, temList);
                    }
                    if (!sourceTimestampWithDevice.containsKey(deviceName)) {
                        List<Long> Time = new ArrayList<Long>();
                        if (!Time.contains(sourceTimestamp)) {
                            Time.add(sourceTimestamp);
                        }
                        sourceTimestampWithDevice.put(deviceName, Time);
                    } else {
                        List<Long> tempList2 = sourceTimestampWithDevice.get(deviceName);
                        if (!tempList2.contains(sourceTimestamp)) {
                            tempList2.add(sourceTimestamp);
                        }
                        sourceTimestampWithDevice.put(deviceName, tempList2);
                    }
                    LOGGER.info(" map of source time stamp is :::" + sourceTimestampWithDevice);
                    LOGGER.info(" map of value is :::" + nodeValueWithDevice);
                }
            }
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }
        });
    } catch (MqttException | NoSuchAlgorithmException me) {
        LOGGER.error("Error while connecting to Mqtt broker. Error message {} Error code {}", me.getMessage());
    }
}
public void subscription(String inputTopic) {
    try {
        connectToBroker();
        mqttClient.subscribe(getOutputTopic(inputTopic), 1);
        LOGGER.info("subscription is done::::");
    } catch (Exception e) {
        LOGGER.error("Error while subscribing message to broker", e.getMessage());
        e.printStackTrace();
    }
}
2

There are 2 answers

0
hardillb On

No, the clients all designed to receive all messages for the lifetime of the client connection.

If you only want to be subscribed for a given duration it's up to you to find a way to be be notified when that time has passed and explicitly disconnect the client.

1
AudioBubble On

According to the MQTT specification for both v5.0 and v3.1.1, there is no specified way to only subscribe to a topic for a fixed interval. However, this could be done through your application logic.

In your case, assuming you have full control of the client, you can subscribe to some topic, keep track of the time connected, then after 15 minutes (or whatever interval you specify) send an UNSUBSCRIBE packet for that topic.