#include "mqtt_client.h" #include "logger.h" #include #include #define MQTT_KEEPALIVE_SECONDS 60 #define MQTT_QOS_LEVEL 0 #define LOOP_TIMEOUT_MS -1 #define LOOP_MAX_MESSAGES 1 /** * @brief Callback called when a message arrives on a subscribed topic. * * @param mosq Mosquitto client instance. * @param userdata User data pointer (unused). * @param msg Incoming message data. */ static void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message); /** * @brief Helper function to handle errors by printing a message, * cleaning up the MQTT client, and returning failure. * * @param client MQTT client instance. * @param error_message Error message to print. * @param mosq_error_code Mosquitto error code. * @return int Always returns 1 to indicate failure. */ static int fail_with_cleanup(mqtt_client_t *client, const char *error_message, int mosq_error_code); int mqtt_client_init(mqtt_client_t *client, const char *broker_address, int broker_port, const char *topic) { if (!client || !broker_address || !topic) return fprintf(stderr, "Invalid MQTT init parameters\n"), 1; int init_result = mosquitto_lib_init(); if (init_result != MOSQ_ERR_SUCCESS) { fprintf(stderr, "Mosquitto library initialization failed: %s\n", mosquitto_strerror(init_result)); return 1; } client->mosq = mosquitto_new(NULL, true, NULL); if (!client->mosq) return fprintf(stderr, "Cannot create MQTT client\n"), 1; mosquitto_message_callback_set(client->mosq, on_message); int connect_result = mosquitto_connect( client->mosq, broker_address, broker_port, MQTT_KEEPALIVE_SECONDS); if (connect_result != MOSQ_ERR_SUCCESS) return fail_with_cleanup( client, "Cannot connect to MQTT broker", connect_result); int subscribe_result = mosquitto_subscribe(client->mosq, NULL, topic, MQTT_QOS_LEVEL); if (subscribe_result != MOSQ_ERR_SUCCESS) return fail_with_cleanup(client, "Cannot subscribe to topic", subscribe_result); return 0; } void mqtt_client_cleanup(mqtt_client_t *client) { if (!client) return; if (client->mosq) mosquitto_destroy(client->mosq); mosquitto_lib_cleanup(); } void mqtt_client_loop(mqtt_client_t *client) { if (client && client->mosq) mosquitto_loop_forever(client->mosq, LOOP_TIMEOUT_MS, LOOP_MAX_MESSAGES); } static void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { if (message && message->payload) logger_handle_message(message->payload, message->payloadlen); } static int fail_with_cleanup(mqtt_client_t *client, const char *error_message, int mosq_error_code) { fprintf(stderr, "%s: %s\n", error_message, mosquitto_strerror(mosq_error_code)); if (client && client->mosq) mosquitto_destroy(client->mosq); return 1; }