When connecting using MQTTv5, Mosquitto does not automatically reconnect after the first connection failure

148 views Asked by At

I am using Mosquitto version 2.0.18 for MQTT connections. When using MQTTv5, if the initial connection attempt fails, Mosquitto does not automatically attempt a reconnection. The initialization code is as follows:

char sn[64] = {0};
mqtt_prop_sn_get(NULL, sn, sizeof(sn));
g_mosq = mosquitto_new(sn, session, NULL);
if (!g_mosq)
{
    MOCAR_LOG_ERROR("create client fail");
    return -1;
}
mosquitto_int_option(g_mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
int loop = mosquitto_loop_start(g_mosq);
if (loop != MOSQ_ERR_SUCCESS)
{
    MOCAR_LOG_ERROR_FMT("mosquitto loop error");
    huali_plf_mqtt_cleanup();
    return -1;
}
mosquitto_username_pw_set(mosq, g_mqtt_info.user_id, g_mqtt_info.password);
mosquitto_connect_v5_callback_set(mosq, my_connect_v5_callback);
mosquitto_message_v5_callback_set(mosq, my_message_v5_callback);
mosquitto_subscribe_v5_callback_set(mosq, my_subscribe_v5_callback);
mosquitto_disconnect_v5_callback_set(mosq, my_disconnect_v5_callback);
mosquitto_reconnect_delay_set(mosq, 5, 15, true);

while (1)
{
    ret = mosquitto_connect_async(g_mosq, mqtt_info.address, mqtt_info.port, KEEP_ALIVE);
    if (MOSQ_ERR_SUCCESS == ret)
    {
        MOCAR_LOG_DEBUG_FMT("mosquitto_connect_async success");
        break;
    }
    else if (MOSQ_ERR_INVAL == ret)
    {
        MOCAR_LOG_DEBUG_FMT("the input parameters were invalid");
        break;
    }
    else
    {
        MOCAR_LOG_DEBUG_FMT("mqtt connect failed: %s", strerror(errno));
        sleep(3);
        continue;
    }
}

Later, I attempted to use a timer to manually reconnect every 30 seconds, which successfully reestablishes the connection. However, it results in an 8MB memory leak issue each time mosquitto_loop_start is called. Here is my reconnection code:

void mqtt_check_reconnect(void)
{
    if (1 != cv2x_plf_mqtt_get_send_flag())
    {
        int rc = 0;
        mosquitto_disconnect(g_mosq);
        mosquitto_loop_stop(g_mosq, true);
        sleep(1);
        mosquitto_username_pw_set(g_mosq, g_mqtt_info.user_id, g_mqtt_info.password);
        cv2x_plf_mqtt_connect_handle();
        rc = mosquitto_loop_start(g_mosq);
        if (rc)
        {
            MOCAR_LOG_ERROR_FMT("mosquitto_loop_start failed, rc is %s", mosquitto_strerror(rc));
            return;
        }
    }
}
1

There are 1 answers

1
buerwang On

The problem has been identified. In the mosquitto__thread_main function, the value of mosq->threaded is set to mosq_ts_none, which directly causes mosquitto_loop_stop to return because mosq->threaded != mosq_ts_self, and thread-related operations are not performed.

To modify the Mosquitto source code, you can comment out the line that sets mosq->threaded to mosq_ts_none, like this:

void *mosquitto__thread_main(void *obj)
{
    struct mosquitto *mosq = obj;
#ifndef WIN32
    struct timespec ts;
    ts.tv_sec = 0;
    ts.tv_nsec = 10000000;
#endif

    if(!mosq) return NULL;

    do{
        if(mosquitto__get_state(mosq) == mosq_cs_new){
#ifdef WIN32
            Sleep(10);
#else
            nanosleep(&ts, NULL);
#endif
        }else{
            break;
        }
    }while(1);

    if(!mosq->keepalive){
        /* Sleep for a day if keepalive disabled. */
        mosquitto_loop_forever(mosq, 1000*86400, 1);
    }else{
        /* Sleep for our keepalive value. publish() etc. will wake us up. */
        mosquitto_loop_forever(mosq, mosq->keepalive*1000, 1);
    }
    // if(mosq->threaded == mosq_ts_self){
    //  mosq->threaded = mosq_ts_none;
    // }

    return obj;
}
#endif