aboutsummaryrefslogtreecommitdiffstats
path: root/analysis/rtt/src/mqtt_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'analysis/rtt/src/mqtt_client.c')
-rw-r--r--analysis/rtt/src/mqtt_client.c114
1 files changed, 83 insertions, 31 deletions
diff --git a/analysis/rtt/src/mqtt_client.c b/analysis/rtt/src/mqtt_client.c
index 43a06d8..edc394d 100644
--- a/analysis/rtt/src/mqtt_client.c
+++ b/analysis/rtt/src/mqtt_client.c
@@ -3,46 +3,98 @@
3#include <stdio.h> 3#include <stdio.h>
4#include <stdlib.h> 4#include <stdlib.h>
5 5
6#define WINDOW_SEC 1 6#define MQTT_KEEPALIVE_SECONDS 60
7#define MQTT_QOS_LEVEL 0
8#define LOOP_TIMEOUT_MS -1
9#define LOOP_MAX_MESSAGES 1
7 10
11/**
12 * @brief Callback called when a message arrives on a subscribed topic.
13 *
14 * @param mosq Mosquitto client instance.
15 * @param userdata User data pointer (unused).
16 * @param msg Incoming message data.
17 */
8static void on_message(struct mosquitto *mosq, void *userdata, 18static void on_message(struct mosquitto *mosq, void *userdata,
9 const struct mosquitto_message *msg); 19 const struct mosquitto_message *message);
20
21/**
22 * @brief Helper function to handle errors by printing a message,
23 * cleaning up the MQTT client, and returning failure.
24 *
25 * @param client MQTT client instance.
26 * @param error_message Error message to print.
27 * @param mosq_error_code Mosquitto error code.
28 * @return int Always returns 1 to indicate failure.
29 */
30static int fail_with_cleanup(mqtt_client_t *client, const char *error_message,
31 int mosq_error_code);
10 32
11int mqtt_client_init(mqtt_client_t *client, const char *broker_address, 33int mqtt_client_init(mqtt_client_t *client, const char *broker_address,
12 int port, const char *topic) { 34 int broker_port, const char *topic)
13 mosquitto_lib_init(); 35{
14 client->mosq = mosquitto_new(NULL, true, NULL); 36 if (!client || !broker_address || !topic)
15 if (!client->mosq) { 37 return fprintf(stderr, "Invalid MQTT init parameters\n"), 1;
16 fprintf(stderr, "Cannot create MQTT client\n"); 38
17 return 1; 39 int init_result = mosquitto_lib_init();
18 } 40 if (init_result != MOSQ_ERR_SUCCESS) {
19 41 fprintf(stderr, "Mosquitto library initialization failed: %s\n",
20 mosquitto_message_callback_set(client->mosq, on_message); 42 mosquitto_strerror(init_result));
21 43 return 1;
22 if (mosquitto_connect(client->mosq, broker_address, port, 60) != 44 }
23 MOSQ_ERR_SUCCESS) { 45
24 fprintf(stderr, "Nie można połączyć się z brokerem MQTT\n"); 46 client->mosq = mosquitto_new(NULL, true, NULL);
25 return 1; 47 if (!client->mosq)
26 } 48 return fprintf(stderr, "Cannot create MQTT client\n"), 1;
27 49
28 if (mosquitto_subscribe(client->mosq, NULL, topic, 0) != MOSQ_ERR_SUCCESS) { 50 mosquitto_message_callback_set(client->mosq, on_message);
29 fprintf(stderr, "Cannot subscribe to topic\n"); 51
30 return 1; 52 int connect_result = mosquitto_connect(
31 } 53 client->mosq, broker_address, broker_port, MQTT_KEEPALIVE_SECONDS);
32 54 if (connect_result != MOSQ_ERR_SUCCESS)
33 return 0; 55 return fail_with_cleanup(
56 client, "Cannot connect to MQTT broker", connect_result);
57
58 int subscribe_result =
59 mosquitto_subscribe(client->mosq, NULL, topic, MQTT_QOS_LEVEL);
60 if (subscribe_result != MOSQ_ERR_SUCCESS)
61 return fail_with_cleanup(client, "Cannot subscribe to topic",
62 subscribe_result);
63
64 return 0;
34} 65}
35 66
36void mqtt_client_cleanup(mqtt_client_t *client) { 67void mqtt_client_cleanup(mqtt_client_t *client)
37 mosquitto_destroy(client->mosq); 68{
38 mosquitto_lib_cleanup(); 69 if (!client)
70 return;
71
72 if (client->mosq)
73 mosquitto_destroy(client->mosq);
74
75 mosquitto_lib_cleanup();
39} 76}
40 77
41void mqtt_client_loop(mqtt_client_t *client) { 78void mqtt_client_loop(mqtt_client_t *client)
42 mosquitto_loop_forever(client->mosq, -1, 1); 79{
80 if (client && client->mosq)
81 mosquitto_loop_forever(client->mosq, LOOP_TIMEOUT_MS,
82 LOOP_MAX_MESSAGES);
43} 83}
44 84
45static void on_message(struct mosquitto *mosq, void *userdata, 85static void on_message(struct mosquitto *mosq, void *userdata,
46 const struct mosquitto_message *msg) { 86 const struct mosquitto_message *message)
47 logger_handle_message(msg->payload, msg->payloadlen); 87{
88 if (message && message->payload)
89 logger_handle_message(message->payload, message->payloadlen);
90}
91
92static int fail_with_cleanup(mqtt_client_t *client, const char *error_message,
93 int mosq_error_code)
94{
95 fprintf(stderr, "%s: %s\n", error_message,
96 mosquitto_strerror(mosq_error_code));
97 if (client && client->mosq)
98 mosquitto_destroy(client->mosq);
99 return 1;
48} 100}