How to avoid subscribing to the same topic and fire the callback multiple times in HiveMQ Android Client?

1.8k views Asked by At

Expected behavior

I want to have a callback to listen to every topic I subscribe to just once per message sent. I mean, I want to subscribe to a topic 1000 times, but when a message is received, I want to listen to it just one time.

IDK if there is something I am doing wrong (I guess).

Actual behavior

  • I am developing a home security camera app.
  • I have a list of cameras that I own.
  • For every camera on the list, I subscribe to a topic.
  • Every 30s, I update the screen, and again I subscribe to a topic for every camera. This means a TOPIC could be subscribed many times.
  • Every time I receive a message on a topic, the callback fires messages about how many times the same topi was subscribed.

To Reproduce

Steps

  1. haven a topic camera/123
  2. subscribe the topic N times with the below method called subscribeWith
  3. Send a message over camera/123
  4. You will receive the message N times because the N time you subscribed to the topic

Reproducer code

Just variables

private var mqtt: Mqtt5AsyncClient? = null
private var username: String? = null
private var password: String? = null
private val serverHost: String,
private val serverPort: Int = 1883

Build the MQTT

private fun build() {
        if (mqtt != null) return

        mqtt = Mqtt5Client.builder()
                .identifier(identifier())
                .serverHost(serverHost)
                .serverPort(serverPort)
                .automaticReconnect()
                .applyAutomaticReconnect()
                .addConnectedListener { Timber.d("On Connected") }
                .addDisconnectedListener { onMQTTDisconnected(it) }
                .buildAsync()
    }

Connecting the MQTT

fun connect(username: String, password: String) {
        build()

        this.username = username
        this.password = password

        mqtt?.connectWith()
                ?.keepAlive(30)
                ?.sessionExpiryInterval(7200)
                ?.cleanStart(false)
                ?.simpleAuth()
                ?.username("abc")
                ?.password("123".toByteArray())
                ?.applySimpleAuth()
                ?.send()
    }

And then, subscribing a topic Every time I subscribe a topic I use these fun

fun subscribeWith(topic: String) {
        mqtt?.subscribeWith()
                ?.topicFilter(topic)
                ?.qos(MqttQos.AT_MOST_ONCE)
               ?.callback { t -> onConsumingTopic(t) }  <- I THINK THIS IS THE IMPORTANT THING
                ?.send()
                ?.whenComplete { ack, error -> onTopicConnected(ack, error, topic) }

    }
2

There are 2 answers

0
Deneb Chorny On BEST ANSWER

I found the correct answer.

There is no need to register a callback for every subscribe call nor using a global array handling the registered topics as this:

mqtt?.subscribeWith()
    ?.callback { t -> onConsumingTopic(t) }  <- This is not needed

Instead you could register one "global" callback for all messages, for example:

client.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { ... } );

and then you can subscribe without providing a callback.

client.subscribeWith().topicFilter("test").qos(MqttQos.AT_LEAST_ONCE).send();

Complete example:

Building the MQTT

 mqtt = Mqtt5Client.builder()
                .identifier(identifier())
                .serverHost(serverHost)
                .serverPort(serverPort)
                .automaticReconnect()
                .applyAutomaticReconnect()
                .addConnectedListener { onMQTTConnected(it) }
                .addDisconnectedListener { onMQTTDisconnected(it) }
                .buildAsync()

        mqtt?.publishes(MqttGlobalPublishFilter.SUBSCRIBED) { onConsumingTopic(it) }

Connecting the MQTT

mqtt?.connectWith()
                ?.keepAlive(30)
                ?.sessionExpiryInterval(7200)
                ?.cleanStart(false)
                ?.simpleAuth()
                ?.username(context.getString(R.string.mqtt_user))
                ?.password(context.getString(R.string.mqtt_pw).toByteArray())
                ?.applySimpleAuth()
                ?.send()

Subscribing the topic

mqtt?.subscribeWith()
                ?.topicFilter(topic)
                ?.qos(MqttQos.AT_LEAST_ONCE)
                ?.send()
                ?.whenComplete { ack, error -> onTopicSubscribed(ack, error, topic) }
0
hardillb On

As mentioned in the comments, the only solution at the moment is to keep a list of the subscribed topics outside the MQTT client library and check it before subscribing to new topics.