aboutsummaryrefslogtreecommitdiffstats
path: root/analysis/rtt
diff options
context:
space:
mode:
authorFilip Wandzio <contact@philw.dev>2025-09-05 03:30:24 +0200
committerFilip Wandzio <contact@philw.dev>2025-09-05 03:30:24 +0200
commit01713bbe20d2cf5aafbe5eb32721d3e4fc5823d8 (patch)
tree33748d0019e3939bd0daf50940407e51d4325a8f /analysis/rtt
parent1ba21da6cbc63c0c549fb92731e25bedc482eb51 (diff)
downloade1-01713bbe20d2cf5aafbe5eb32721d3e4fc5823d8.tar.gz
e1-01713bbe20d2cf5aafbe5eb32721d3e4fc5823d8.zip
Standarize the project directory for monorepo-like developer experience
Move the clang formatter to the root of the three so all nested projects could use it Provide README for all other projects Refactor the code in rtt agregator Signed-off-by: Filip Wandzio <contact@philw.dev>
Diffstat (limited to 'analysis/rtt')
-rw-r--r--analysis/rtt/README.txt40
-rw-r--r--analysis/rtt/src/logger.c151
-rw-r--r--analysis/rtt/src/logger.h22
-rw-r--r--analysis/rtt/src/main.c40
-rw-r--r--analysis/rtt/src/mqtt_client.c114
-rw-r--r--analysis/rtt/src/mqtt_client.h29
6 files changed, 286 insertions, 110 deletions
diff --git a/analysis/rtt/README.txt b/analysis/rtt/README.txt
new file mode 100644
index 0000000..4a45e6d
--- /dev/null
+++ b/analysis/rtt/README.txt
@@ -0,0 +1,40 @@
1Project: MQTT RTT and Throughput Logger
2
3Description:
4This project implements an MQTT client that connects to a broker, subscribes to a topic, and logs message round-trip time (RTT) and throughput statistics to a CSV file and standard output. The logger measures the time difference between message sent timestamps and received timestamps to calculate RTT and maintains a sliding window of message times to estimate throughput in messages per second.
5
6Files:
7
8mqtt_client.h / mqtt_client.c: MQTT client wrapper using the Mosquitto library.
9
10logger.h / logger.c: Logger module for handling incoming MQTT messages and logging RTT/throughput.
11
12main.c: Entry point initializing the MQTT client and logger, then running the MQTT client loop.
13
14Dependencies:
15
16Mosquitto library (libmosquitto)
17
18Standard C libraries (stdio, stdlib, string, sys/time, time)
19
20Build Instructions:
21
22Look at the Dockerfile and Makefile to find out.
23Usage:
24
25Configure MQTT broker address, port, and topic in main.c.
26
27Run the compiled executable.
28
29The program connects to the broker, subscribes to the specified topic, and logs RTT and throughput stats to a CSV file.
30
31Output is also printed to the console.
32
33Notes:
34
35The logger expects the payload of received messages to contain a timestamp string representing the sent time in milliseconds.
36
37The program uses blocking call mosquitto_loop_forever to run the MQTT network loop.
38
39Contact:
40For questions contact Filip Wandzio at contact@philw.dev
diff --git a/analysis/rtt/src/logger.c b/analysis/rtt/src/logger.c
index ed5f6e5..e1dd2f3 100644
--- a/analysis/rtt/src/logger.c
+++ b/analysis/rtt/src/logger.c
@@ -1,4 +1,3 @@
1
2#include "logger.h" 1#include "logger.h"
3#include <stdio.h> 2#include <stdio.h>
4#include <stdlib.h> 3#include <stdlib.h>
@@ -9,78 +8,120 @@
9#define WINDOW_SEC 1 8#define WINDOW_SEC 1
10#define MAX_TIMES 1000 9#define MAX_TIMES 1000
11 10
11#define ISO_TIME_BUF_SIZE 32
12
13#define MICROSECONDS_IN_SECOND 1000000
14#define MICROSECONDS_IN_MILLISECOND 1000
15
16#define CSV_HEADER "timestamp,sent_ms,received_ms,rtt_ms,throughput_msg_per_s\n"
17#define CSV_LINE_FORMAT "%s,%lld,%lld,%lld,%d\n"
18
12static FILE *csv_file = NULL; 19static FILE *csv_file = NULL;
13static double msg_times[MAX_TIMES]; 20static double msg_times[MAX_TIMES];
14static int msg_count = 0; 21static int msg_count = 0;
15 22
16static long long current_time_ms() { 23/**
17 struct timeval tv; 24 * @brief Returns the current time in milliseconds since the Epoch.
18 gettimeofday(&tv, NULL); 25 *
19 return ((long long)tv.tv_sec * 1000) + (tv.tv_usec / 1000); 26 * @return Current time in milliseconds (int64).
27 */
28static long long current_time_ms(void);
29
30/**
31 * @brief Returns the current time in seconds (with microsecond precision) since
32 * the Epoch.
33 *
34 * @return Current time in seconds (double).
35 */
36static double current_time_sec(void);
37
38/**
39 * @brief Updates the sliding window of message timestamps and returns the count
40 * of messages within WINDOW_SEC.
41 *
42 * @param now Current timestamp in seconds.
43 * @return Number of messages in the last WINDOW_SEC seconds.
44 */
45static int update_throughput_window(double now);
46
47static long long current_time_ms(void)
48{
49 struct timeval tv;
50 gettimeofday(&tv, NULL);
51 return ((long long)tv.tv_sec * 1000) +
52 (tv.tv_usec / MICROSECONDS_IN_MILLISECOND);
20} 53}
21 54
22static double current_time_sec() { 55static double current_time_sec(void)
23 struct timeval tv; 56{
24 gettimeofday(&tv, NULL); 57 struct timeval tv;
25 return (double)tv.tv_sec + (tv.tv_usec / 1e6); 58 gettimeofday(&tv, NULL);
59 return (double)tv.tv_sec +
60 (tv.tv_usec / (double)MICROSECONDS_IN_SECOND);
26} 61}
27 62
28static int update_throughput_window(double now) { 63static int update_throughput_window(double current_time)
29 int i, new_count = 0; 64{
30 for (i = 0; i < msg_count; i++) { 65 int msg_index, window_count = 0;
31 if (now - msg_times[i] < WINDOW_SEC) { 66 for (msg_index = 0; msg_index < msg_count; msg_index++) {
32 msg_times[new_count++] = msg_times[i]; 67 if (current_time - msg_times[msg_index] < WINDOW_SEC)
33 } 68 msg_times[window_count++] = msg_times[msg_index];
34 } 69 }
35 msg_times[new_count++] = now; 70 msg_times[window_count++] = current_time;
36 msg_count = new_count; 71 msg_count = window_count;
37 return msg_count; 72 return msg_count;
38} 73}
39 74
40void logger_init(const char *filename) { 75void logger_init(const char *filename)
41 csv_file = fopen(filename, "w"); 76{
42 if (!csv_file) { 77 csv_file = fopen(filename, "w");
43 perror("Cannot open CSV file"); 78 if (!csv_file) {
44 exit(EXIT_FAILURE); 79 perror("Cannot open CSV file");
45 } 80 exit(EXIT_FAILURE);
46 fprintf(csv_file, 81 }
47 "timestamp,sent_ms,received_ms,rtt_ms,throughput_msg_per_s\n"); 82 fprintf(csv_file, CSV_HEADER);
48 fflush(csv_file); 83 fflush(csv_file);
49} 84}
50 85
51void logger_cleanup() { 86void logger_cleanup(void)
52 if (csv_file) { 87{
53 fclose(csv_file); 88 if (csv_file) {
54 csv_file = NULL; 89 fclose(csv_file);
55 } 90 csv_file = NULL;
91 }
56} 92}
57 93
58void logger_handle_message(const void *payload, int payloadlen) { 94void logger_handle_message(const void *payload, int payloadlen)
59 char *payload_str = malloc(payloadlen + 1); 95{
60 if (!payload_str) 96 if (payloadlen <= 0)
61 return; 97 return;
62 memcpy(payload_str, payload, payloadlen); 98
63 payload_str[payloadlen] = '\0'; 99 char *payload_str = malloc(payloadlen + 1);
100 if (!payload_str)
101 return;
102
103 memcpy(payload_str, payload, payloadlen);
104 payload_str[payloadlen] = '\0';
64 105
65 long long sent_ms = atoll(payload_str); 106 long long sent_ms = atoll(payload_str);
66 free(payload_str); 107 free(payload_str);
67 108
68 long long received_ms = current_time_ms(); 109 long long received_ms = current_time_ms();
69 long long rtt = received_ms - sent_ms; 110 long long rtt = received_ms - sent_ms;
70 111
71 double now_sec = current_time_sec(); 112 double now_sec = current_time_sec();
72 int throughput = update_throughput_window(now_sec); 113 int throughput = update_throughput_window(now_sec);
73 114
74 time_t now = time(NULL); 115 time_t now = time(NULL);
75 struct tm *tm_info = localtime(&now); 116 struct tm *tm_info = localtime(&now);
76 char iso_time[32]; 117 char iso_time[ISO_TIME_BUF_SIZE];
77 strftime(iso_time, sizeof(iso_time), "%Y-%m-%dT%H:%M:%S", tm_info); 118 strftime(iso_time, sizeof(iso_time), "%Y-%m-%dT%H:%M:%S", tm_info);
78 119
79 if (csv_file) { 120 if (csv_file) {
80 fprintf(csv_file, "%s,%lld,%lld,%lld,%d\n", iso_time, sent_ms, received_ms, 121 fprintf(csv_file, CSV_LINE_FORMAT, iso_time, sent_ms,
81 rtt, throughput); 122 received_ms, rtt, throughput);
82 fflush(csv_file); 123 fflush(csv_file);
83 } 124 }
84 125
85 printf("RTT: %lld ms | Throughput: %d msg/s\n", rtt, throughput); 126 printf("RTT: %lld ms | Throughput: %d msg/s\n", rtt, throughput);
86} 127}
diff --git a/analysis/rtt/src/logger.h b/analysis/rtt/src/logger.h
index f1602c9..904c327 100644
--- a/analysis/rtt/src/logger.h
+++ b/analysis/rtt/src/logger.h
@@ -1,9 +1,25 @@
1
2#ifndef LOGGER_H 1#ifndef LOGGER_H
3#define LOGGER_H 2#define LOGGER_H
4 3
4/**
5 * @brief Initializes the logger by opening the CSV file and writing the header.
6 *
7 * @param filename Path to the CSV log file.
8 */
5void logger_init(const char *filename); 9void logger_init(const char *filename);
6void logger_cleanup(); 10
11/**
12 * @brief Closes the CSV file and cleans up resources.
13 */
14void logger_cleanup(void);
15
16/**
17 * @brief Processes an incoming message payload, parses the timestamp,
18 * calculates RTT and throughput, and logs this data to the CSV and stdout.
19 *
20 * @param payload Pointer to the message payload.
21 * @param payloadlen Length of the payload in bytes.
22 */
7void logger_handle_message(const void *payload, int payloadlen); 23void logger_handle_message(const void *payload, int payloadlen);
8 24
9#endif // LOGGER_H 25#endif
diff --git a/analysis/rtt/src/main.c b/analysis/rtt/src/main.c
index 13f7716..163b5f6 100644
--- a/analysis/rtt/src/main.c
+++ b/analysis/rtt/src/main.c
@@ -1,4 +1,3 @@
1
2#include "logger.h" 1#include "logger.h"
3#include "mqtt_client.h" 2#include "mqtt_client.h"
4#include <stdio.h> 3#include <stdio.h>
@@ -8,21 +7,26 @@
8#define TOPIC "device/echo/in" 7#define TOPIC "device/echo/in"
9#define CSV_FILE "rtt_throughput_log.csv" 8#define CSV_FILE "rtt_throughput_log.csv"
10 9
11int main() { 10/**
12 mqtt_client_t client; 11 * @brief Entry point for MQTT client program.
13 12 *
14 logger_init(CSV_FILE); 13 * Initializes the logger and MQTT client, then enters the MQTT loop to receive
15 14 * messages. Cleans up resources on exit.
16 if (mqtt_client_init(&client, BROKER_ADDRESS, BROKER_PORT, TOPIC) != 0) { 15 *
17 fprintf(stderr, "MQTT initialization error\n"); 16 * @return int Exit code (0 on success, 1 on failure).
18 return 1; 17 */
19 } 18int main(void)
20 19{
21 printf("Connection established.Waiting for messages...\n"); 20 mqtt_client_t client;
22 mqtt_client_loop(&client); 21 logger_init(CSV_FILE);
23 22
24 logger_cleanup(); 23 if (mqtt_client_init(&client, BROKER_ADDRESS, BROKER_PORT, TOPIC))
25 mqtt_client_cleanup(&client); 24 return fprintf(stderr, "MQTT initialization error\n"), 1;
26 25
27 return 0; 26 printf("Connection established. Waiting for messages...\n");
27 mqtt_client_loop(&client);
28 logger_cleanup();
29 mqtt_client_cleanup(&client);
30
31 return 0;
28} 32}
diff --git a/analysis/rtt/src/mqtt_client.c b/analysis/rtt/src/mqtt_client.c
index 43a06d8..edc394d 100644
--- a/analysis/rtt/src/mqtt_client.c
+++ b/analysis/rtt/src/mqtt_client.c
@@ -3,46 +3,98 @@
3#include <stdio.h> 3#include <stdio.h>
4#include <stdlib.h> 4#include <stdlib.h>
5 5
6#define WINDOW_SEC 1 6#define MQTT_KEEPALIVE_SECONDS 60
7#define MQTT_QOS_LEVEL 0
8#define LOOP_TIMEOUT_MS -1
9#define LOOP_MAX_MESSAGES 1
7 10
11/**
12 * @brief Callback called when a message arrives on a subscribed topic.
13 *
14 * @param mosq Mosquitto client instance.
15 * @param userdata User data pointer (unused).
16 * @param msg Incoming message data.
17 */
8static void on_message(struct mosquitto *mosq, void *userdata, 18static void on_message(struct mosquitto *mosq, void *userdata,
9 const struct mosquitto_message *msg); 19 const struct mosquitto_message *message);
20
21/**
22 * @brief Helper function to handle errors by printing a message,
23 * cleaning up the MQTT client, and returning failure.
24 *
25 * @param client MQTT client instance.
26 * @param error_message Error message to print.
27 * @param mosq_error_code Mosquitto error code.
28 * @return int Always returns 1 to indicate failure.
29 */
30static int fail_with_cleanup(mqtt_client_t *client, const char *error_message,
31 int mosq_error_code);
10 32
11int mqtt_client_init(mqtt_client_t *client, const char *broker_address, 33int mqtt_client_init(mqtt_client_t *client, const char *broker_address,
12 int port, const char *topic) { 34 int broker_port, const char *topic)
13 mosquitto_lib_init(); 35{
14 client->mosq = mosquitto_new(NULL, true, NULL); 36 if (!client || !broker_address || !topic)
15 if (!client->mosq) { 37 return fprintf(stderr, "Invalid MQTT init parameters\n"), 1;
16 fprintf(stderr, "Cannot create MQTT client\n"); 38
17 return 1; 39 int init_result = mosquitto_lib_init();
18 } 40 if (init_result != MOSQ_ERR_SUCCESS) {
19 41 fprintf(stderr, "Mosquitto library initialization failed: %s\n",
20 mosquitto_message_callback_set(client->mosq, on_message); 42 mosquitto_strerror(init_result));
21 43 return 1;
22 if (mosquitto_connect(client->mosq, broker_address, port, 60) != 44 }
23 MOSQ_ERR_SUCCESS) { 45
24 fprintf(stderr, "Nie można połączyć się z brokerem MQTT\n"); 46 client->mosq = mosquitto_new(NULL, true, NULL);
25 return 1; 47 if (!client->mosq)
26 } 48 return fprintf(stderr, "Cannot create MQTT client\n"), 1;
27 49
28 if (mosquitto_subscribe(client->mosq, NULL, topic, 0) != MOSQ_ERR_SUCCESS) { 50 mosquitto_message_callback_set(client->mosq, on_message);
29 fprintf(stderr, "Cannot subscribe to topic\n"); 51
30 return 1; 52 int connect_result = mosquitto_connect(
31 } 53 client->mosq, broker_address, broker_port, MQTT_KEEPALIVE_SECONDS);
32 54 if (connect_result != MOSQ_ERR_SUCCESS)
33 return 0; 55 return fail_with_cleanup(
56 client, "Cannot connect to MQTT broker", connect_result);
57
58 int subscribe_result =
59 mosquitto_subscribe(client->mosq, NULL, topic, MQTT_QOS_LEVEL);
60 if (subscribe_result != MOSQ_ERR_SUCCESS)
61 return fail_with_cleanup(client, "Cannot subscribe to topic",
62 subscribe_result);
63
64 return 0;
34} 65}
35 66
36void mqtt_client_cleanup(mqtt_client_t *client) { 67void mqtt_client_cleanup(mqtt_client_t *client)
37 mosquitto_destroy(client->mosq); 68{
38 mosquitto_lib_cleanup(); 69 if (!client)
70 return;
71
72 if (client->mosq)
73 mosquitto_destroy(client->mosq);
74
75 mosquitto_lib_cleanup();
39} 76}
40 77
41void mqtt_client_loop(mqtt_client_t *client) { 78void mqtt_client_loop(mqtt_client_t *client)
42 mosquitto_loop_forever(client->mosq, -1, 1); 79{
80 if (client && client->mosq)
81 mosquitto_loop_forever(client->mosq, LOOP_TIMEOUT_MS,
82 LOOP_MAX_MESSAGES);
43} 83}
44 84
45static void on_message(struct mosquitto *mosq, void *userdata, 85static void on_message(struct mosquitto *mosq, void *userdata,
46 const struct mosquitto_message *msg) { 86 const struct mosquitto_message *message)
47 logger_handle_message(msg->payload, msg->payloadlen); 87{
88 if (message && message->payload)
89 logger_handle_message(message->payload, message->payloadlen);
90}
91
92static int fail_with_cleanup(mqtt_client_t *client, const char *error_message,
93 int mosq_error_code)
94{
95 fprintf(stderr, "%s: %s\n", error_message,
96 mosquitto_strerror(mosq_error_code));
97 if (client && client->mosq)
98 mosquitto_destroy(client->mosq);
99 return 1;
48} 100}
diff --git a/analysis/rtt/src/mqtt_client.h b/analysis/rtt/src/mqtt_client.h
index 781e742..1faa4ad 100644
--- a/analysis/rtt/src/mqtt_client.h
+++ b/analysis/rtt/src/mqtt_client.h
@@ -1,16 +1,39 @@
1
2#ifndef MQTT_CLIENT_H 1#ifndef MQTT_CLIENT_H
3#define MQTT_CLIENT_H 2#define MQTT_CLIENT_H
4 3
5#include <mosquitto.h> 4#include <mosquitto.h>
6 5
7typedef struct { 6typedef struct {
8 struct mosquitto *mosq; 7 struct mosquitto *mosq;
9} mqtt_client_t; 8} mqtt_client_t;
10 9
10/**
11 * @brief Initialize the MQTT client, connect to the broker, and subscribe to a
12 * topic.
13 *
14 * @param client Pointer to mqtt_client_t struct.
15 * @param broker_address MQTT broker IP or hostname.
16 * @param port Broker port number.
17 * @param topic Topic to subscribe to.
18 * @return int 0 on success, 1 on failure.
19 */
11int mqtt_client_init(mqtt_client_t *client, const char *broker_address, 20int mqtt_client_init(mqtt_client_t *client, const char *broker_address,
12 int port, const char *topic); 21 int port, const char *topic);
22
23/**
24 * @brief Cleanup MQTT client resources and free associated memory.
25 *
26 * @param client Pointer to mqtt_client_t struct.
27 */
13void mqtt_client_cleanup(mqtt_client_t *client); 28void mqtt_client_cleanup(mqtt_client_t *client);
29
30/**
31 * @brief Run the MQTT client loop to process network events.
32 *
33 * This function blocks indefinitely.
34 *
35 * @param client Pointer to mqtt_client_t struct.
36 */
14void mqtt_client_loop(mqtt_client_t *client); 37void mqtt_client_loop(mqtt_client_t *client);
15 38
16#endif // MQTT_CLIENT_H 39#endif