aboutsummaryrefslogtreecommitdiffstats
path: root/analysis/rtt/src
diff options
context:
space:
mode:
Diffstat (limited to 'analysis/rtt/src')
-rw-r--r--analysis/rtt/src/logger.c151
-rw-r--r--analysis/rtt/src/logger.h22
-rw-r--r--analysis/rtt/src/main.c40
-rw-r--r--analysis/rtt/src/mqtt_client.c114
-rw-r--r--analysis/rtt/src/mqtt_client.h29
5 files changed, 246 insertions, 110 deletions
diff --git a/analysis/rtt/src/logger.c b/analysis/rtt/src/logger.c
index ed5f6e5..e1dd2f3 100644
--- a/analysis/rtt/src/logger.c
+++ b/analysis/rtt/src/logger.c
@@ -1,4 +1,3 @@
1
2#include "logger.h" 1#include "logger.h"
3#include <stdio.h> 2#include <stdio.h>
4#include <stdlib.h> 3#include <stdlib.h>
@@ -9,78 +8,120 @@
9#define WINDOW_SEC 1 8#define WINDOW_SEC 1
10#define MAX_TIMES 1000 9#define MAX_TIMES 1000
11 10
11#define ISO_TIME_BUF_SIZE 32
12
13#define MICROSECONDS_IN_SECOND 1000000
14#define MICROSECONDS_IN_MILLISECOND 1000
15
16#define CSV_HEADER "timestamp,sent_ms,received_ms,rtt_ms,throughput_msg_per_s\n"
17#define CSV_LINE_FORMAT "%s,%lld,%lld,%lld,%d\n"
18
12static FILE *csv_file = NULL; 19static FILE *csv_file = NULL;
13static double msg_times[MAX_TIMES]; 20static double msg_times[MAX_TIMES];
14static int msg_count = 0; 21static int msg_count = 0;
15 22
16static long long current_time_ms() { 23/**
17 struct timeval tv; 24 * @brief Returns the current time in milliseconds since the Epoch.
18 gettimeofday(&tv, NULL); 25 *
19 return ((long long)tv.tv_sec * 1000) + (tv.tv_usec / 1000); 26 * @return Current time in milliseconds (int64).
27 */
28static long long current_time_ms(void);
29
30/**
31 * @brief Returns the current time in seconds (with microsecond precision) since
32 * the Epoch.
33 *
34 * @return Current time in seconds (double).
35 */
36static double current_time_sec(void);
37
38/**
39 * @brief Updates the sliding window of message timestamps and returns the count
40 * of messages within WINDOW_SEC.
41 *
42 * @param now Current timestamp in seconds.
43 * @return Number of messages in the last WINDOW_SEC seconds.
44 */
45static int update_throughput_window(double now);
46
47static long long current_time_ms(void)
48{
49 struct timeval tv;
50 gettimeofday(&tv, NULL);
51 return ((long long)tv.tv_sec * 1000) +
52 (tv.tv_usec / MICROSECONDS_IN_MILLISECOND);
20} 53}
21 54
22static double current_time_sec() { 55static double current_time_sec(void)
23 struct timeval tv; 56{
24 gettimeofday(&tv, NULL); 57 struct timeval tv;
25 return (double)tv.tv_sec + (tv.tv_usec / 1e6); 58 gettimeofday(&tv, NULL);
59 return (double)tv.tv_sec +
60 (tv.tv_usec / (double)MICROSECONDS_IN_SECOND);
26} 61}
27 62
28static int update_throughput_window(double now) { 63static int update_throughput_window(double current_time)
29 int i, new_count = 0; 64{
30 for (i = 0; i < msg_count; i++) { 65 int msg_index, window_count = 0;
31 if (now - msg_times[i] < WINDOW_SEC) { 66 for (msg_index = 0; msg_index < msg_count; msg_index++) {
32 msg_times[new_count++] = msg_times[i]; 67 if (current_time - msg_times[msg_index] < WINDOW_SEC)
33 } 68 msg_times[window_count++] = msg_times[msg_index];
34 } 69 }
35 msg_times[new_count++] = now; 70 msg_times[window_count++] = current_time;
36 msg_count = new_count; 71 msg_count = window_count;
37 return msg_count; 72 return msg_count;
38} 73}
39 74
40void logger_init(const char *filename) { 75void logger_init(const char *filename)
41 csv_file = fopen(filename, "w"); 76{
42 if (!csv_file) { 77 csv_file = fopen(filename, "w");
43 perror("Cannot open CSV file"); 78 if (!csv_file) {
44 exit(EXIT_FAILURE); 79 perror("Cannot open CSV file");
45 } 80 exit(EXIT_FAILURE);
46 fprintf(csv_file, 81 }
47 "timestamp,sent_ms,received_ms,rtt_ms,throughput_msg_per_s\n"); 82 fprintf(csv_file, CSV_HEADER);
48 fflush(csv_file); 83 fflush(csv_file);
49} 84}
50 85
51void logger_cleanup() { 86void logger_cleanup(void)
52 if (csv_file) { 87{
53 fclose(csv_file); 88 if (csv_file) {
54 csv_file = NULL; 89 fclose(csv_file);
55 } 90 csv_file = NULL;
91 }
56} 92}
57 93
58void logger_handle_message(const void *payload, int payloadlen) { 94void logger_handle_message(const void *payload, int payloadlen)
59 char *payload_str = malloc(payloadlen + 1); 95{
60 if (!payload_str) 96 if (payloadlen <= 0)
61 return; 97 return;
62 memcpy(payload_str, payload, payloadlen); 98
63 payload_str[payloadlen] = '\0'; 99 char *payload_str = malloc(payloadlen + 1);
100 if (!payload_str)
101 return;
102
103 memcpy(payload_str, payload, payloadlen);
104 payload_str[payloadlen] = '\0';
64 105
65 long long sent_ms = atoll(payload_str); 106 long long sent_ms = atoll(payload_str);
66 free(payload_str); 107 free(payload_str);
67 108
68 long long received_ms = current_time_ms(); 109 long long received_ms = current_time_ms();
69 long long rtt = received_ms - sent_ms; 110 long long rtt = received_ms - sent_ms;
70 111
71 double now_sec = current_time_sec(); 112 double now_sec = current_time_sec();
72 int throughput = update_throughput_window(now_sec); 113 int throughput = update_throughput_window(now_sec);
73 114
74 time_t now = time(NULL); 115 time_t now = time(NULL);
75 struct tm *tm_info = localtime(&now); 116 struct tm *tm_info = localtime(&now);
76 char iso_time[32]; 117 char iso_time[ISO_TIME_BUF_SIZE];
77 strftime(iso_time, sizeof(iso_time), "%Y-%m-%dT%H:%M:%S", tm_info); 118 strftime(iso_time, sizeof(iso_time), "%Y-%m-%dT%H:%M:%S", tm_info);
78 119
79 if (csv_file) { 120 if (csv_file) {
80 fprintf(csv_file, "%s,%lld,%lld,%lld,%d\n", iso_time, sent_ms, received_ms, 121 fprintf(csv_file, CSV_LINE_FORMAT, iso_time, sent_ms,
81 rtt, throughput); 122 received_ms, rtt, throughput);
82 fflush(csv_file); 123 fflush(csv_file);
83 } 124 }
84 125
85 printf("RTT: %lld ms | Throughput: %d msg/s\n", rtt, throughput); 126 printf("RTT: %lld ms | Throughput: %d msg/s\n", rtt, throughput);
86} 127}
diff --git a/analysis/rtt/src/logger.h b/analysis/rtt/src/logger.h
index f1602c9..904c327 100644
--- a/analysis/rtt/src/logger.h
+++ b/analysis/rtt/src/logger.h
@@ -1,9 +1,25 @@
1
2#ifndef LOGGER_H 1#ifndef LOGGER_H
3#define LOGGER_H 2#define LOGGER_H
4 3
4/**
5 * @brief Initializes the logger by opening the CSV file and writing the header.
6 *
7 * @param filename Path to the CSV log file.
8 */
5void logger_init(const char *filename); 9void logger_init(const char *filename);
6void logger_cleanup(); 10
11/**
12 * @brief Closes the CSV file and cleans up resources.
13 */
14void logger_cleanup(void);
15
16/**
17 * @brief Processes an incoming message payload, parses the timestamp,
18 * calculates RTT and throughput, and logs this data to the CSV and stdout.
19 *
20 * @param payload Pointer to the message payload.
21 * @param payloadlen Length of the payload in bytes.
22 */
7void logger_handle_message(const void *payload, int payloadlen); 23void logger_handle_message(const void *payload, int payloadlen);
8 24
9#endif // LOGGER_H 25#endif
diff --git a/analysis/rtt/src/main.c b/analysis/rtt/src/main.c
index 13f7716..163b5f6 100644
--- a/analysis/rtt/src/main.c
+++ b/analysis/rtt/src/main.c
@@ -1,4 +1,3 @@
1
2#include "logger.h" 1#include "logger.h"
3#include "mqtt_client.h" 2#include "mqtt_client.h"
4#include <stdio.h> 3#include <stdio.h>
@@ -8,21 +7,26 @@
8#define TOPIC "device/echo/in" 7#define TOPIC "device/echo/in"
9#define CSV_FILE "rtt_throughput_log.csv" 8#define CSV_FILE "rtt_throughput_log.csv"
10 9
11int main() { 10/**
12 mqtt_client_t client; 11 * @brief Entry point for MQTT client program.
13 12 *
14 logger_init(CSV_FILE); 13 * Initializes the logger and MQTT client, then enters the MQTT loop to receive
15 14 * messages. Cleans up resources on exit.
16 if (mqtt_client_init(&client, BROKER_ADDRESS, BROKER_PORT, TOPIC) != 0) { 15 *
17 fprintf(stderr, "MQTT initialization error\n"); 16 * @return int Exit code (0 on success, 1 on failure).
18 return 1; 17 */
19 } 18int main(void)
20 19{
21 printf("Connection established.Waiting for messages...\n"); 20 mqtt_client_t client;
22 mqtt_client_loop(&client); 21 logger_init(CSV_FILE);
23 22
24 logger_cleanup(); 23 if (mqtt_client_init(&client, BROKER_ADDRESS, BROKER_PORT, TOPIC))
25 mqtt_client_cleanup(&client); 24 return fprintf(stderr, "MQTT initialization error\n"), 1;
26 25
27 return 0; 26 printf("Connection established. Waiting for messages...\n");
27 mqtt_client_loop(&client);
28 logger_cleanup();
29 mqtt_client_cleanup(&client);
30
31 return 0;
28} 32}
diff --git a/analysis/rtt/src/mqtt_client.c b/analysis/rtt/src/mqtt_client.c
index 43a06d8..edc394d 100644
--- a/analysis/rtt/src/mqtt_client.c
+++ b/analysis/rtt/src/mqtt_client.c
@@ -3,46 +3,98 @@
3#include <stdio.h> 3#include <stdio.h>
4#include <stdlib.h> 4#include <stdlib.h>
5 5
6#define WINDOW_SEC 1 6#define MQTT_KEEPALIVE_SECONDS 60
7#define MQTT_QOS_LEVEL 0
8#define LOOP_TIMEOUT_MS -1
9#define LOOP_MAX_MESSAGES 1
7 10
11/**
12 * @brief Callback called when a message arrives on a subscribed topic.
13 *
14 * @param mosq Mosquitto client instance.
15 * @param userdata User data pointer (unused).
16 * @param msg Incoming message data.
17 */
8static void on_message(struct mosquitto *mosq, void *userdata, 18static void on_message(struct mosquitto *mosq, void *userdata,
9 const struct mosquitto_message *msg); 19 const struct mosquitto_message *message);
20
21/**
22 * @brief Helper function to handle errors by printing a message,
23 * cleaning up the MQTT client, and returning failure.
24 *
25 * @param client MQTT client instance.
26 * @param error_message Error message to print.
27 * @param mosq_error_code Mosquitto error code.
28 * @return int Always returns 1 to indicate failure.
29 */
30static int fail_with_cleanup(mqtt_client_t *client, const char *error_message,
31 int mosq_error_code);
10 32
11int mqtt_client_init(mqtt_client_t *client, const char *broker_address, 33int mqtt_client_init(mqtt_client_t *client, const char *broker_address,
12 int port, const char *topic) { 34 int broker_port, const char *topic)
13 mosquitto_lib_init(); 35{
14 client->mosq = mosquitto_new(NULL, true, NULL); 36 if (!client || !broker_address || !topic)
15 if (!client->mosq) { 37 return fprintf(stderr, "Invalid MQTT init parameters\n"), 1;
16 fprintf(stderr, "Cannot create MQTT client\n"); 38
17 return 1; 39 int init_result = mosquitto_lib_init();
18 } 40 if (init_result != MOSQ_ERR_SUCCESS) {
19 41 fprintf(stderr, "Mosquitto library initialization failed: %s\n",
20 mosquitto_message_callback_set(client->mosq, on_message); 42 mosquitto_strerror(init_result));
21 43 return 1;
22 if (mosquitto_connect(client->mosq, broker_address, port, 60) != 44 }
23 MOSQ_ERR_SUCCESS) { 45
24 fprintf(stderr, "Nie można połączyć się z brokerem MQTT\n"); 46 client->mosq = mosquitto_new(NULL, true, NULL);
25 return 1; 47 if (!client->mosq)
26 } 48 return fprintf(stderr, "Cannot create MQTT client\n"), 1;
27 49
28 if (mosquitto_subscribe(client->mosq, NULL, topic, 0) != MOSQ_ERR_SUCCESS) { 50 mosquitto_message_callback_set(client->mosq, on_message);
29 fprintf(stderr, "Cannot subscribe to topic\n"); 51
30 return 1; 52 int connect_result = mosquitto_connect(
31 } 53 client->mosq, broker_address, broker_port, MQTT_KEEPALIVE_SECONDS);
32 54 if (connect_result != MOSQ_ERR_SUCCESS)
33 return 0; 55 return fail_with_cleanup(
56 client, "Cannot connect to MQTT broker", connect_result);
57
58 int subscribe_result =
59 mosquitto_subscribe(client->mosq, NULL, topic, MQTT_QOS_LEVEL);
60 if (subscribe_result != MOSQ_ERR_SUCCESS)
61 return fail_with_cleanup(client, "Cannot subscribe to topic",
62 subscribe_result);
63
64 return 0;
34} 65}
35 66
36void mqtt_client_cleanup(mqtt_client_t *client) { 67void mqtt_client_cleanup(mqtt_client_t *client)
37 mosquitto_destroy(client->mosq); 68{
38 mosquitto_lib_cleanup(); 69 if (!client)
70 return;
71
72 if (client->mosq)
73 mosquitto_destroy(client->mosq);
74
75 mosquitto_lib_cleanup();
39} 76}
40 77
41void mqtt_client_loop(mqtt_client_t *client) { 78void mqtt_client_loop(mqtt_client_t *client)
42 mosquitto_loop_forever(client->mosq, -1, 1); 79{
80 if (client && client->mosq)
81 mosquitto_loop_forever(client->mosq, LOOP_TIMEOUT_MS,
82 LOOP_MAX_MESSAGES);
43} 83}
44 84
45static void on_message(struct mosquitto *mosq, void *userdata, 85static void on_message(struct mosquitto *mosq, void *userdata,
46 const struct mosquitto_message *msg) { 86 const struct mosquitto_message *message)
47 logger_handle_message(msg->payload, msg->payloadlen); 87{
88 if (message && message->payload)
89 logger_handle_message(message->payload, message->payloadlen);
90}
91
92static int fail_with_cleanup(mqtt_client_t *client, const char *error_message,
93 int mosq_error_code)
94{
95 fprintf(stderr, "%s: %s\n", error_message,
96 mosquitto_strerror(mosq_error_code));
97 if (client && client->mosq)
98 mosquitto_destroy(client->mosq);
99 return 1;
48} 100}
diff --git a/analysis/rtt/src/mqtt_client.h b/analysis/rtt/src/mqtt_client.h
index 781e742..1faa4ad 100644
--- a/analysis/rtt/src/mqtt_client.h
+++ b/analysis/rtt/src/mqtt_client.h
@@ -1,16 +1,39 @@
1
2#ifndef MQTT_CLIENT_H 1#ifndef MQTT_CLIENT_H
3#define MQTT_CLIENT_H 2#define MQTT_CLIENT_H
4 3
5#include <mosquitto.h> 4#include <mosquitto.h>
6 5
7typedef struct { 6typedef struct {
8 struct mosquitto *mosq; 7 struct mosquitto *mosq;
9} mqtt_client_t; 8} mqtt_client_t;
10 9
10/**
11 * @brief Initialize the MQTT client, connect to the broker, and subscribe to a
12 * topic.
13 *
14 * @param client Pointer to mqtt_client_t struct.
15 * @param broker_address MQTT broker IP or hostname.
16 * @param port Broker port number.
17 * @param topic Topic to subscribe to.
18 * @return int 0 on success, 1 on failure.
19 */
11int mqtt_client_init(mqtt_client_t *client, const char *broker_address, 20int mqtt_client_init(mqtt_client_t *client, const char *broker_address,
12 int port, const char *topic); 21 int port, const char *topic);
22
23/**
24 * @brief Cleanup MQTT client resources and free associated memory.
25 *
26 * @param client Pointer to mqtt_client_t struct.
27 */
13void mqtt_client_cleanup(mqtt_client_t *client); 28void mqtt_client_cleanup(mqtt_client_t *client);
29
30/**
31 * @brief Run the MQTT client loop to process network events.
32 *
33 * This function blocks indefinitely.
34 *
35 * @param client Pointer to mqtt_client_t struct.
36 */
14void mqtt_client_loop(mqtt_client_t *client); 37void mqtt_client_loop(mqtt_client_t *client);
15 38
16#endif // MQTT_CLIENT_H 39#endif