aboutsummaryrefslogtreecommitdiffstats
path: root/analysis/rtt/src/mqtt_client.c
diff options
context:
space:
mode:
authorFilip Wandzio <contact@philw.dev>2025-09-05 03:30:24 +0200
committerFilip Wandzio <contact@philw.dev>2025-09-05 03:30:24 +0200
commit01713bbe20d2cf5aafbe5eb32721d3e4fc5823d8 (patch)
tree33748d0019e3939bd0daf50940407e51d4325a8f /analysis/rtt/src/mqtt_client.c
parent1ba21da6cbc63c0c549fb92731e25bedc482eb51 (diff)
downloade1-01713bbe20d2cf5aafbe5eb32721d3e4fc5823d8.tar.gz
e1-01713bbe20d2cf5aafbe5eb32721d3e4fc5823d8.zip
Standarize the project directory for monorepo-like developer experience
Move the clang formatter to the root of the three so all nested projects could use it Provide README for all other projects Refactor the code in rtt agregator Signed-off-by: Filip Wandzio <contact@philw.dev>
Diffstat (limited to 'analysis/rtt/src/mqtt_client.c')
-rw-r--r--analysis/rtt/src/mqtt_client.c114
1 files changed, 83 insertions, 31 deletions
diff --git a/analysis/rtt/src/mqtt_client.c b/analysis/rtt/src/mqtt_client.c
index 43a06d8..edc394d 100644
--- a/analysis/rtt/src/mqtt_client.c
+++ b/analysis/rtt/src/mqtt_client.c
@@ -3,46 +3,98 @@
3#include <stdio.h> 3#include <stdio.h>
4#include <stdlib.h> 4#include <stdlib.h>
5 5
6#define WINDOW_SEC 1 6#define MQTT_KEEPALIVE_SECONDS 60
7#define MQTT_QOS_LEVEL 0
8#define LOOP_TIMEOUT_MS -1
9#define LOOP_MAX_MESSAGES 1
7 10
11/**
12 * @brief Callback called when a message arrives on a subscribed topic.
13 *
14 * @param mosq Mosquitto client instance.
15 * @param userdata User data pointer (unused).
16 * @param msg Incoming message data.
17 */
8static void on_message(struct mosquitto *mosq, void *userdata, 18static void on_message(struct mosquitto *mosq, void *userdata,
9 const struct mosquitto_message *msg); 19 const struct mosquitto_message *message);
20
21/**
22 * @brief Helper function to handle errors by printing a message,
23 * cleaning up the MQTT client, and returning failure.
24 *
25 * @param client MQTT client instance.
26 * @param error_message Error message to print.
27 * @param mosq_error_code Mosquitto error code.
28 * @return int Always returns 1 to indicate failure.
29 */
30static int fail_with_cleanup(mqtt_client_t *client, const char *error_message,
31 int mosq_error_code);
10 32
11int mqtt_client_init(mqtt_client_t *client, const char *broker_address, 33int mqtt_client_init(mqtt_client_t *client, const char *broker_address,
12 int port, const char *topic) { 34 int broker_port, const char *topic)
13 mosquitto_lib_init(); 35{
14 client->mosq = mosquitto_new(NULL, true, NULL); 36 if (!client || !broker_address || !topic)
15 if (!client->mosq) { 37 return fprintf(stderr, "Invalid MQTT init parameters\n"), 1;
16 fprintf(stderr, "Cannot create MQTT client\n"); 38
17 return 1; 39 int init_result = mosquitto_lib_init();
18 } 40 if (init_result != MOSQ_ERR_SUCCESS) {
19 41 fprintf(stderr, "Mosquitto library initialization failed: %s\n",
20 mosquitto_message_callback_set(client->mosq, on_message); 42 mosquitto_strerror(init_result));
21 43 return 1;
22 if (mosquitto_connect(client->mosq, broker_address, port, 60) != 44 }
23 MOSQ_ERR_SUCCESS) { 45
24 fprintf(stderr, "Nie można połączyć się z brokerem MQTT\n"); 46 client->mosq = mosquitto_new(NULL, true, NULL);
25 return 1; 47 if (!client->mosq)
26 } 48 return fprintf(stderr, "Cannot create MQTT client\n"), 1;
27 49
28 if (mosquitto_subscribe(client->mosq, NULL, topic, 0) != MOSQ_ERR_SUCCESS) { 50 mosquitto_message_callback_set(client->mosq, on_message);
29 fprintf(stderr, "Cannot subscribe to topic\n"); 51
30 return 1; 52 int connect_result = mosquitto_connect(
31 } 53 client->mosq, broker_address, broker_port, MQTT_KEEPALIVE_SECONDS);
32 54 if (connect_result != MOSQ_ERR_SUCCESS)
33 return 0; 55 return fail_with_cleanup(
56 client, "Cannot connect to MQTT broker", connect_result);
57
58 int subscribe_result =
59 mosquitto_subscribe(client->mosq, NULL, topic, MQTT_QOS_LEVEL);
60 if (subscribe_result != MOSQ_ERR_SUCCESS)
61 return fail_with_cleanup(client, "Cannot subscribe to topic",
62 subscribe_result);
63
64 return 0;
34} 65}
35 66
36void mqtt_client_cleanup(mqtt_client_t *client) { 67void mqtt_client_cleanup(mqtt_client_t *client)
37 mosquitto_destroy(client->mosq); 68{
38 mosquitto_lib_cleanup(); 69 if (!client)
70 return;
71
72 if (client->mosq)
73 mosquitto_destroy(client->mosq);
74
75 mosquitto_lib_cleanup();
39} 76}
40 77
41void mqtt_client_loop(mqtt_client_t *client) { 78void mqtt_client_loop(mqtt_client_t *client)
42 mosquitto_loop_forever(client->mosq, -1, 1); 79{
80 if (client && client->mosq)
81 mosquitto_loop_forever(client->mosq, LOOP_TIMEOUT_MS,
82 LOOP_MAX_MESSAGES);
43} 83}
44 84
45static void on_message(struct mosquitto *mosq, void *userdata, 85static void on_message(struct mosquitto *mosq, void *userdata,
46 const struct mosquitto_message *msg) { 86 const struct mosquitto_message *message)
47 logger_handle_message(msg->payload, msg->payloadlen); 87{
88 if (message && message->payload)
89 logger_handle_message(message->payload, message->payloadlen);
90}
91
92static int fail_with_cleanup(mqtt_client_t *client, const char *error_message,
93 int mosq_error_code)
94{
95 fprintf(stderr, "%s: %s\n", error_message,
96 mosquitto_strerror(mosq_error_code));
97 if (client && client->mosq)
98 mosquitto_destroy(client->mosq);
99 return 1;
48} 100}