mqtt_client cant emit messages on subscribe flutter

1.6k views Asked by At

I am unable to get messages on subscribe. what am I doing wrong?

I am able to register a connection to the broker. I just can't emit any thing in client.updates.listen.

also what is the difference between MqttPublishMessage and MqttSubscribeMessage?

import 'dart:async';
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

final client = MqttServerClient.withPort('ws://192.168.43.56', 'cliendId', 8080,);

Future<int> main() async {
  /// Set logging on if needed, defaults to off
  client.logging(on: false);
  client.keepAlivePeriod = 20;
  client.port = 8080;
  client.useWebSocket = true;
  client.onDisconnected = onDisconnected;
  client.onConnected = onConnected;
  client.onSubscribed = onSubscribed;
  
  client.pongCallback = pong;
  final connMess = MqttConnectMessage()
      .withClientIdentifier('client_id')
      .keepAliveFor(60) // Must agree with the keep alive set above or not set
      .startClean() // Non persistent session for testing
      .withWillQos(MqttQos.atLeastOnce);
  print('EXAMPLE::Mosquitto client connecting....');
  client.connectionMessage = connMess;

  try {
    await client.connect();
  } on Exception catch (e) {
    print('EXAMPLE::client exception - $e');
    client.disconnect();
  }

  /// Check we are connected
  if (client.connectionStatus.state == MqttConnectionState.connected) {
    print('EXAMPLE::Mosquitto client connected');
  } else {
    /// Use status here rather than state if you also want the broker return code.
    print(
        'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, status is ${client.connectionStatus}');
    client.disconnect();
    return -1;
  }

  /// Ok, lets try a subscription
  print('EXAMPLE::Subscribing to the test/lol topic');
  const topic = '/busline/201'; // Not a wildcard topic
  client.subscribe(topic, MqttQos.exactlyOnce);

  client.updates.listen((List<MqttReceivedMessage<MqttMessage>> c) {
    final MqttPublishMessage recMess = c[0].payload;
    final pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);

    print(
        'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
    print('');
  });

  

  print('EXAMPLE::Sleeping....');
  await MqttUtilities.asyncSleep(120);

}

/// The subscribed callback
void onSubscribed(String topic) {
  print('EXAMPLE::Subscription confirmed for topic $topic');
}

/// The unsolicited disconnect callback
void onDisconnected() {
  print('EXAMPLE::OnDisconnected client callback - Client disconnection');
  if (client.connectionStatus.disconnectionOrigin ==
      MqttDisconnectionOrigin.solicited) {
    print('EXAMPLE::OnDisconnected callback is solicited, this is correct');
  }
}

/// The successful connect callback
void onConnected() {
  print(
      'EXAMPLE::OnConnected client callback - Client connection was sucessful');
}

/// Pong callback
void pong() {
  print('EXAMPLE::Ping response client callback invoked');
}

1

There are 1 answers

0
Vicky Leekha On

you can do like this

// connection succeeded
void onConnected() {
  print('Connected');
}

// unconnected
void onDisconnected() {
  print('Disconnected');
}

// subscribe to topic succeeded
void onSubscribed(String topic) {
  print('Subscribed topic: $topic');
}

// subscribe to topic failed
void onSubscribeFail(String topic) {
  print('Failed to subscribe $topic');
}

// unsubscribe succeeded
void onUnsubscribed(String topic) {
  print('Unsubscribed topic: $topic');
}

// PING response received
void pong() {
  print('Ping response client callback invoked');
}

var data;
Future<MqttServerClient> connect() async {
  MqttServerClient client = MqttServerClient.withPort(
      'broker', "client something unique", port);
  client.logging(on: false);
  client.onConnected = onConnected;
  client.onDisconnected = onDisconnected;
  // client.onUnsubscribed = onUnsubscribed;
  client.onSubscribed = onSubscribed;
  client.onSubscribeFail = onSubscribeFail;
  client.pongCallback = pong;
  final connMessage = MqttConnectMessage()
      .authenticateAs('Enter Client Id', 'Enter Password')
      .withClientIdentifier("something unique match with client")
      .startClean()
      // .withWillRetain()
      .withWillQos(MqttQos.atLeastOnce);
  client.connectionMessage = connMessage;
  try {
    await client.connect();
    // client.unsubscribe('topic/');
   
    client.subscribe('topic/', MqttQos.atLeastOnce);
   

  } catch (e) {
    print('Exception: $e');
    client.disconnect();
  }

  client.updates!.listen((List<MqttReceivedMessage<MqttMessage>> c) {
    final MqttPublishMessage message = c[0].payload as MqttPublishMessage;
    final payload =
        MqttPublishPayload.bytesToStringAsString(message.payload.message);
    data = jsonDecode(payload);

    print('Received message:$payload from topic: ${c[0].topic}>');
  });

 


  return client;
}