diff options
Diffstat (limited to 'analysis/rtt/src/mqtt_client.c')
| -rw-r--r-- | analysis/rtt/src/mqtt_client.c | 114 |
1 files changed, 83 insertions, 31 deletions
diff --git a/analysis/rtt/src/mqtt_client.c b/analysis/rtt/src/mqtt_client.c index 43a06d8..edc394d 100644 --- a/analysis/rtt/src/mqtt_client.c +++ b/analysis/rtt/src/mqtt_client.c | |||
| @@ -3,46 +3,98 @@ | |||
| 3 | #include <stdio.h> | 3 | #include <stdio.h> |
| 4 | #include <stdlib.h> | 4 | #include <stdlib.h> |
| 5 | 5 | ||
| 6 | #define WINDOW_SEC 1 | 6 | #define MQTT_KEEPALIVE_SECONDS 60 |
| 7 | #define MQTT_QOS_LEVEL 0 | ||
| 8 | #define LOOP_TIMEOUT_MS -1 | ||
| 9 | #define LOOP_MAX_MESSAGES 1 | ||
| 7 | 10 | ||
| 11 | /** | ||
| 12 | * @brief Callback called when a message arrives on a subscribed topic. | ||
| 13 | * | ||
| 14 | * @param mosq Mosquitto client instance. | ||
| 15 | * @param userdata User data pointer (unused). | ||
| 16 | * @param msg Incoming message data. | ||
| 17 | */ | ||
| 8 | static void on_message(struct mosquitto *mosq, void *userdata, | 18 | static void on_message(struct mosquitto *mosq, void *userdata, |
| 9 | const struct mosquitto_message *msg); | 19 | const struct mosquitto_message *message); |
| 20 | |||
| 21 | /** | ||
| 22 | * @brief Helper function to handle errors by printing a message, | ||
| 23 | * cleaning up the MQTT client, and returning failure. | ||
| 24 | * | ||
| 25 | * @param client MQTT client instance. | ||
| 26 | * @param error_message Error message to print. | ||
| 27 | * @param mosq_error_code Mosquitto error code. | ||
| 28 | * @return int Always returns 1 to indicate failure. | ||
| 29 | */ | ||
| 30 | static int fail_with_cleanup(mqtt_client_t *client, const char *error_message, | ||
| 31 | int mosq_error_code); | ||
| 10 | 32 | ||
| 11 | int mqtt_client_init(mqtt_client_t *client, const char *broker_address, | 33 | int mqtt_client_init(mqtt_client_t *client, const char *broker_address, |
| 12 | int port, const char *topic) { | 34 | int broker_port, const char *topic) |
| 13 | mosquitto_lib_init(); | 35 | { |
| 14 | client->mosq = mosquitto_new(NULL, true, NULL); | 36 | if (!client || !broker_address || !topic) |
| 15 | if (!client->mosq) { | 37 | return fprintf(stderr, "Invalid MQTT init parameters\n"), 1; |
| 16 | fprintf(stderr, "Cannot create MQTT client\n"); | 38 | |
| 17 | return 1; | 39 | int init_result = mosquitto_lib_init(); |
| 18 | } | 40 | if (init_result != MOSQ_ERR_SUCCESS) { |
| 19 | 41 | fprintf(stderr, "Mosquitto library initialization failed: %s\n", | |
| 20 | mosquitto_message_callback_set(client->mosq, on_message); | 42 | mosquitto_strerror(init_result)); |
| 21 | 43 | return 1; | |
| 22 | if (mosquitto_connect(client->mosq, broker_address, port, 60) != | 44 | } |
| 23 | MOSQ_ERR_SUCCESS) { | 45 | |
| 24 | fprintf(stderr, "Nie można połączyć się z brokerem MQTT\n"); | 46 | client->mosq = mosquitto_new(NULL, true, NULL); |
| 25 | return 1; | 47 | if (!client->mosq) |
| 26 | } | 48 | return fprintf(stderr, "Cannot create MQTT client\n"), 1; |
| 27 | 49 | ||
| 28 | if (mosquitto_subscribe(client->mosq, NULL, topic, 0) != MOSQ_ERR_SUCCESS) { | 50 | mosquitto_message_callback_set(client->mosq, on_message); |
| 29 | fprintf(stderr, "Cannot subscribe to topic\n"); | 51 | |
| 30 | return 1; | 52 | int connect_result = mosquitto_connect( |
| 31 | } | 53 | client->mosq, broker_address, broker_port, MQTT_KEEPALIVE_SECONDS); |
| 32 | 54 | if (connect_result != MOSQ_ERR_SUCCESS) | |
| 33 | return 0; | 55 | return fail_with_cleanup( |
| 56 | client, "Cannot connect to MQTT broker", connect_result); | ||
| 57 | |||
| 58 | int subscribe_result = | ||
| 59 | mosquitto_subscribe(client->mosq, NULL, topic, MQTT_QOS_LEVEL); | ||
| 60 | if (subscribe_result != MOSQ_ERR_SUCCESS) | ||
| 61 | return fail_with_cleanup(client, "Cannot subscribe to topic", | ||
| 62 | subscribe_result); | ||
| 63 | |||
| 64 | return 0; | ||
| 34 | } | 65 | } |
| 35 | 66 | ||
| 36 | void mqtt_client_cleanup(mqtt_client_t *client) { | 67 | void mqtt_client_cleanup(mqtt_client_t *client) |
| 37 | mosquitto_destroy(client->mosq); | 68 | { |
| 38 | mosquitto_lib_cleanup(); | 69 | if (!client) |
| 70 | return; | ||
| 71 | |||
| 72 | if (client->mosq) | ||
| 73 | mosquitto_destroy(client->mosq); | ||
| 74 | |||
| 75 | mosquitto_lib_cleanup(); | ||
| 39 | } | 76 | } |
| 40 | 77 | ||
| 41 | void mqtt_client_loop(mqtt_client_t *client) { | 78 | void mqtt_client_loop(mqtt_client_t *client) |
| 42 | mosquitto_loop_forever(client->mosq, -1, 1); | 79 | { |
| 80 | if (client && client->mosq) | ||
| 81 | mosquitto_loop_forever(client->mosq, LOOP_TIMEOUT_MS, | ||
| 82 | LOOP_MAX_MESSAGES); | ||
| 43 | } | 83 | } |
| 44 | 84 | ||
| 45 | static void on_message(struct mosquitto *mosq, void *userdata, | 85 | static void on_message(struct mosquitto *mosq, void *userdata, |
| 46 | const struct mosquitto_message *msg) { | 86 | const struct mosquitto_message *message) |
| 47 | logger_handle_message(msg->payload, msg->payloadlen); | 87 | { |
| 88 | if (message && message->payload) | ||
| 89 | logger_handle_message(message->payload, message->payloadlen); | ||
| 90 | } | ||
| 91 | |||
| 92 | static int fail_with_cleanup(mqtt_client_t *client, const char *error_message, | ||
| 93 | int mosq_error_code) | ||
| 94 | { | ||
| 95 | fprintf(stderr, "%s: %s\n", error_message, | ||
| 96 | mosquitto_strerror(mosq_error_code)); | ||
| 97 | if (client && client->mosq) | ||
| 98 | mosquitto_destroy(client->mosq); | ||
| 99 | return 1; | ||
| 48 | } | 100 | } |
