From 01713bbe20d2cf5aafbe5eb32721d3e4fc5823d8 Mon Sep 17 00:00:00 2001 From: Filip Wandzio Date: Fri, 5 Sep 2025 03:30:24 +0200 Subject: Standarize the project directory for monorepo-like developer experience Move the clang formatter to the root of the three so all nested projects could use it Provide README for all other projects Refactor the code in rtt agregator Signed-off-by: Filip Wandzio --- analysis/rtt/src/mqtt_client.c | 114 ++++++++++++++++++++++++++++++----------- 1 file changed, 83 insertions(+), 31 deletions(-) (limited to 'analysis/rtt/src/mqtt_client.c') diff --git a/analysis/rtt/src/mqtt_client.c b/analysis/rtt/src/mqtt_client.c index 43a06d8..edc394d 100644 --- a/analysis/rtt/src/mqtt_client.c +++ b/analysis/rtt/src/mqtt_client.c @@ -3,46 +3,98 @@ #include #include -#define WINDOW_SEC 1 +#define MQTT_KEEPALIVE_SECONDS 60 +#define MQTT_QOS_LEVEL 0 +#define LOOP_TIMEOUT_MS -1 +#define LOOP_MAX_MESSAGES 1 +/** + * @brief Callback called when a message arrives on a subscribed topic. + * + * @param mosq Mosquitto client instance. + * @param userdata User data pointer (unused). + * @param msg Incoming message data. + */ static void on_message(struct mosquitto *mosq, void *userdata, - const struct mosquitto_message *msg); + const struct mosquitto_message *message); + +/** + * @brief Helper function to handle errors by printing a message, + * cleaning up the MQTT client, and returning failure. + * + * @param client MQTT client instance. + * @param error_message Error message to print. + * @param mosq_error_code Mosquitto error code. + * @return int Always returns 1 to indicate failure. + */ +static int fail_with_cleanup(mqtt_client_t *client, const char *error_message, + int mosq_error_code); int mqtt_client_init(mqtt_client_t *client, const char *broker_address, - int port, const char *topic) { - mosquitto_lib_init(); - client->mosq = mosquitto_new(NULL, true, NULL); - if (!client->mosq) { - fprintf(stderr, "Cannot create MQTT client\n"); - return 1; - } - - mosquitto_message_callback_set(client->mosq, on_message); - - if (mosquitto_connect(client->mosq, broker_address, port, 60) != - MOSQ_ERR_SUCCESS) { - fprintf(stderr, "Nie można połączyć się z brokerem MQTT\n"); - return 1; - } - - if (mosquitto_subscribe(client->mosq, NULL, topic, 0) != MOSQ_ERR_SUCCESS) { - fprintf(stderr, "Cannot subscribe to topic\n"); - return 1; - } - - return 0; + int broker_port, const char *topic) +{ + if (!client || !broker_address || !topic) + return fprintf(stderr, "Invalid MQTT init parameters\n"), 1; + + int init_result = mosquitto_lib_init(); + if (init_result != MOSQ_ERR_SUCCESS) { + fprintf(stderr, "Mosquitto library initialization failed: %s\n", + mosquitto_strerror(init_result)); + return 1; + } + + client->mosq = mosquitto_new(NULL, true, NULL); + if (!client->mosq) + return fprintf(stderr, "Cannot create MQTT client\n"), 1; + + mosquitto_message_callback_set(client->mosq, on_message); + + int connect_result = mosquitto_connect( + client->mosq, broker_address, broker_port, MQTT_KEEPALIVE_SECONDS); + if (connect_result != MOSQ_ERR_SUCCESS) + return fail_with_cleanup( + client, "Cannot connect to MQTT broker", connect_result); + + int subscribe_result = + mosquitto_subscribe(client->mosq, NULL, topic, MQTT_QOS_LEVEL); + if (subscribe_result != MOSQ_ERR_SUCCESS) + return fail_with_cleanup(client, "Cannot subscribe to topic", + subscribe_result); + + return 0; } -void mqtt_client_cleanup(mqtt_client_t *client) { - mosquitto_destroy(client->mosq); - mosquitto_lib_cleanup(); +void mqtt_client_cleanup(mqtt_client_t *client) +{ + if (!client) + return; + + if (client->mosq) + mosquitto_destroy(client->mosq); + + mosquitto_lib_cleanup(); } -void mqtt_client_loop(mqtt_client_t *client) { - mosquitto_loop_forever(client->mosq, -1, 1); +void mqtt_client_loop(mqtt_client_t *client) +{ + if (client && client->mosq) + mosquitto_loop_forever(client->mosq, LOOP_TIMEOUT_MS, + LOOP_MAX_MESSAGES); } static void on_message(struct mosquitto *mosq, void *userdata, - const struct mosquitto_message *msg) { - logger_handle_message(msg->payload, msg->payloadlen); + const struct mosquitto_message *message) +{ + if (message && message->payload) + logger_handle_message(message->payload, message->payloadlen); +} + +static int fail_with_cleanup(mqtt_client_t *client, const char *error_message, + int mosq_error_code) +{ + fprintf(stderr, "%s: %s\n", error_message, + mosquitto_strerror(mosq_error_code)); + if (client && client->mosq) + mosquitto_destroy(client->mosq); + return 1; } -- cgit v1.2.3