From 1ba21da6cbc63c0c549fb92731e25bedc482eb51 Mon Sep 17 00:00:00 2001 From: Filip Wandzio Date: Thu, 4 Sep 2025 22:25:39 +0200 Subject: Unify the directory, add new analysis methods, unify the code style Signed-off-by: Filip Wandzio --- analysis/e1anl/Makefile | 27 +++++ .../e1anl/__pycache__/agregate.cpython-313.pyc | Bin 0 -> 3423 bytes analysis/e1anl/__pycache__/analyze.cpython-313.pyc | Bin 0 -> 2835 bytes analysis/e1anl/requirements.txt | 16 +++ .../e1anl/src/__pycache__/agregate.cpython-313.pyc | Bin 0 -> 3528 bytes .../e1anl/src/__pycache__/analyze.cpython-313.pyc | Bin 0 -> 2962 bytes analysis/e1anl/src/agregate.py | 71 +++++++++++++ analysis/e1anl/src/analyze.py | 49 +++++++++ analysis/e1anl/src/main.py | 17 +++ analysis/rtt/Dockerfile | 29 +++++ analysis/rtt/Makefile | 22 ++++ analysis/rtt/src/logger.c | 86 +++++++++++++++ analysis/rtt/src/logger.h | 9 ++ analysis/rtt/src/main.c | 28 +++++ analysis/rtt/src/mqtt_client.c | 48 +++++++++ analysis/rtt/src/mqtt_client.h | 16 +++ analysis/rtt/src/mqtt_rtt_logger.c | 117 +++++++++++++++++++++ 17 files changed, 535 insertions(+) create mode 100644 analysis/e1anl/Makefile create mode 100644 analysis/e1anl/__pycache__/agregate.cpython-313.pyc create mode 100644 analysis/e1anl/__pycache__/analyze.cpython-313.pyc create mode 100644 analysis/e1anl/requirements.txt create mode 100644 analysis/e1anl/src/__pycache__/agregate.cpython-313.pyc create mode 100644 analysis/e1anl/src/__pycache__/analyze.cpython-313.pyc create mode 100644 analysis/e1anl/src/agregate.py create mode 100644 analysis/e1anl/src/analyze.py create mode 100644 analysis/e1anl/src/main.py create mode 100644 analysis/rtt/Dockerfile create mode 100644 analysis/rtt/Makefile create mode 100644 analysis/rtt/src/logger.c create mode 100644 analysis/rtt/src/logger.h create mode 100644 analysis/rtt/src/main.c create mode 100644 analysis/rtt/src/mqtt_client.c create mode 100644 analysis/rtt/src/mqtt_client.h create mode 100644 analysis/rtt/src/mqtt_rtt_logger.c (limited to 'analysis') diff --git a/analysis/e1anl/Makefile b/analysis/e1anl/Makefile new file mode 100644 index 0000000..7cbd4a3 --- /dev/null +++ b/analysis/e1anl/Makefile @@ -0,0 +1,27 @@ +# Makefile + +VENV_NAME=venv +PYTHON=$(VENV_NAME)/bin/python +SRC=src +OUTPUT=output + +.PHONY: venv install run analyze clean + +venv: + python3 -m venv $(VENV_NAME) + +install: venv + $(PYTHON) -m pip install --upgrade pip + $(PYTHON) -m pip install -r requirements.txt + +run: install + mkdir -p $(OUTPUT) + $(PYTHON) $(SRC)/main.py + +analyze: install + mkdir -p $(OUTPUT) + $(PYTHON) $(SRC)/analyze_metrics.py + +clean: + rm -rf $(VENV_NAME) + rm -rf $(OUTPUT) diff --git a/analysis/e1anl/__pycache__/agregate.cpython-313.pyc b/analysis/e1anl/__pycache__/agregate.cpython-313.pyc new file mode 100644 index 0000000..21c7510 Binary files /dev/null and b/analysis/e1anl/__pycache__/agregate.cpython-313.pyc differ diff --git a/analysis/e1anl/__pycache__/analyze.cpython-313.pyc b/analysis/e1anl/__pycache__/analyze.cpython-313.pyc new file mode 100644 index 0000000..0b6a4ce Binary files /dev/null and b/analysis/e1anl/__pycache__/analyze.cpython-313.pyc differ diff --git a/analysis/e1anl/requirements.txt b/analysis/e1anl/requirements.txt new file mode 100644 index 0000000..3c9cae6 --- /dev/null +++ b/analysis/e1anl/requirements.txt @@ -0,0 +1,16 @@ +contourpy==1.3.3 +cycler==0.12.1 +fonttools==4.59.2 +kiwisolver==1.4.9 +matplotlib==3.10.6 +numpy==2.3.2 +packaging==25.0 +paho-mqtt==2.1.0 +pandas==2.3.2 +pillow==11.3.0 +pyparsing==3.2.3 +python-dateutil==2.9.0.post0 +pytz==2025.2 +seaborn==0.13.2 +six==1.17.0 +tzdata==2025.2 diff --git a/analysis/e1anl/src/__pycache__/agregate.cpython-313.pyc b/analysis/e1anl/src/__pycache__/agregate.cpython-313.pyc new file mode 100644 index 0000000..3a0d6cb Binary files /dev/null and b/analysis/e1anl/src/__pycache__/agregate.cpython-313.pyc differ diff --git a/analysis/e1anl/src/__pycache__/analyze.cpython-313.pyc b/analysis/e1anl/src/__pycache__/analyze.cpython-313.pyc new file mode 100644 index 0000000..a891763 Binary files /dev/null and b/analysis/e1anl/src/__pycache__/analyze.cpython-313.pyc differ diff --git a/analysis/e1anl/src/agregate.py b/analysis/e1anl/src/agregate.py new file mode 100644 index 0000000..1054288 --- /dev/null +++ b/analysis/e1anl/src/agregate.py @@ -0,0 +1,71 @@ +# mqtt_receiver.py + +import paho.mqtt.client as mqtt +import csv +import time +from datetime import datetime +from collections import deque +import os +os.makedirs("output", exist_ok=True) + +BROKER = "192.168.1.101" +SUB_TOPIC = "device/echo/in" +CSV_FILE = "output/rtt_throughput_log.csv" +WINDOW_SEC = 1 + +msg_times = deque() +should_stop = False # flag to stop loop externally + + +def on_connect(client, userdata, flags, rc): + if rc == 0: + print("Connected to broker") + client.subscribe(SUB_TOPIC) + else: + print("Failed to connect, return code:", rc) + + +def on_message(client, userdata, msg): + try: + payload_str = msg.payload.decode() + sent_ms = int(payload_str) + received_ms = int(time.time() * 1000) + rtt = received_ms - sent_ms + timestamp = datetime.now().isoformat() + + now_sec = time.time() + msg_times.append(now_sec) + while msg_times and msg_times[0] < now_sec - WINDOW_SEC: + msg_times.popleft() + throughput = len(msg_times) / WINDOW_SEC + + with open(CSV_FILE, "a", newline="") as f: + writer = csv.writer(f) + writer.writerow([timestamp, sent_ms, received_ms, rtt, throughput]) + + except Exception as e: + print("Error processing message:", e) + + +def collect_data(duration_sec): + global should_stop + should_stop = False + + # Start CSV log + with open(CSV_FILE, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["timestamp", "sent_ms", "received_ms", "rtt_ms", "throughput_msg_per_s"]) + + client = mqtt.Client() + client.on_connect = on_connect + client.on_message = on_message + client.connect(BROKER, 1883, 60) + + client.loop_start() + print(f"Zbieranie danych przez {duration_sec / 60:.1f} minut...") + + time.sleep(duration_sec) + + print("Zbieranie danych zakończone.") + client.loop_stop() + client.disconnect() diff --git a/analysis/e1anl/src/analyze.py b/analysis/e1anl/src/analyze.py new file mode 100644 index 0000000..6e73df3 --- /dev/null +++ b/analysis/e1anl/src/analyze.py @@ -0,0 +1,49 @@ +# analyze_metrics.py + +import pandas as pd +import matplotlib.pyplot as plt +import seaborn as sns + +import os +os.makedirs("output", exist_ok=True) + +def analyze(): + CSV_FILE = "output/rtt_throughput_log.csv" + df = pd.read_csv(CSV_FILE) + + df['sent_ms'] = df['sent_ms'].astype(int) + df['received_ms'] = df['received_ms'].astype(int) + df['rtt_ms'] = df['rtt_ms'].astype(int) + df['throughput_msg_per_s'] = df['throughput_msg_per_s'].astype(float) + + mean_rtt = df['rtt_ms'].mean() + p95_rtt = df['rtt_ms'].quantile(0.95) + mean_throughput = df['throughput_msg_per_s'].mean() + + print(f"Średnie RTT: {mean_rtt:.2f} ms") + print(f"RTT 95-percentyl: {p95_rtt:.2f} ms") + print(f"Średni throughput: {mean_throughput:.2f} msg/s") + + plt.figure(figsize=(6,4)) + sns.barplot(x=['Variant A'], y=[mean_rtt], palette="Set2") + plt.title("Średnie RTT - wariant A") + plt.ylabel("RTT [ms]") + plt.tight_layout() + plt.savefig("output/rtt_mean_a.png") + plt.show() + + plt.figure(figsize=(6,4)) + sns.barplot(x=['Variant A'], y=[p95_rtt], palette="Set3") + plt.title("RTT 95-percentyl - wariant A") + plt.ylabel("RTT [ms]") + plt.tight_layout() + plt.savefig("output/rtt_p95_a.png") + plt.show() + + plt.figure(figsize=(6,4)) + sns.barplot(x=['Variant A'], y=[mean_throughput], palette="Set1") + plt.title("Średni throughput - wariant A") + plt.ylabel("Messages per second") + plt.tight_layout() + plt.savefig("output/throughput_a.png") + plt.show() diff --git a/analysis/e1anl/src/main.py b/analysis/e1anl/src/main.py new file mode 100644 index 0000000..b35eead --- /dev/null +++ b/analysis/e1anl/src/main.py @@ -0,0 +1,17 @@ +# main.py + +from agregate import collect_data +from analyze import analyze + +def main(): + DURATION_HOURS = 2 + DURATION_SECONDS = DURATION_HOURS * 3600 + + print("Collecting data...") + collect_data(DURATION_SECONDS) + + print("Analyzing data...") + analyze() + +if __name__ == "__main__": + main() diff --git a/analysis/rtt/Dockerfile b/analysis/rtt/Dockerfile new file mode 100644 index 0000000..cb58f62 --- /dev/null +++ b/analysis/rtt/Dockerfile @@ -0,0 +1,29 @@ +# FROM debian:bookworm-slim +# RUN apt-get update && apt-get install -y \ +# gcc \ +# make \ +# libmosquitto-dev \ +# && rm -rf /var/lib/apt/lists/* +# WORKDIR /app +# COPY . . +# RUN make + +FROM alpine:latest AS builder + +RUN apk add --no-cache \ + gcc \ + make \ + musl-dev \ + mosquitto-dev + +WORKDIR /app +COPY Makefile mqtt_rtt_logger.c ./ +RUN make + +FROM alpine:latest + +WORKDIR /app + +COPY --from=builder /app/build/mqtt_rtt_logger ./mqtt_rtt_logger + +CMD ["./mqtt_rtt_logger"] diff --git a/analysis/rtt/Makefile b/analysis/rtt/Makefile new file mode 100644 index 0000000..8b8d38e --- /dev/null +++ b/analysis/rtt/Makefile @@ -0,0 +1,22 @@ +TARGET = mqtt_rtt_logger +SRC_DIR = src +SRC = $(SRC_DIR)/mqtt_rtt_logger.c +OUT_DIR = build +CC = gcc +CFLAGS = -Wall -Wextra -O2 +LDFLAGS = -lmosquitto +DOCKER_IMAGE = mqtt_rtt_logger-c-mqtt-logger + +all: $(OUT_DIR) $(OUT_DIR)/$(TARGET) + +$(OUT_DIR): + mkdir -p $(OUT_DIR) + +$(OUT_DIR)/$(TARGET): $(SRC) + $(CC) $(CFLAGS) -o $@ $(SRC) $(LDFLAGS) + +clean: + rm -rf $(OUT_DIR) + +docker-build: $(OUT_DIR) + docker run --rm -v "$(PWD):/app" -w /app $(DOCKER_IMAGE) make all diff --git a/analysis/rtt/src/logger.c b/analysis/rtt/src/logger.c new file mode 100644 index 0000000..ed5f6e5 --- /dev/null +++ b/analysis/rtt/src/logger.c @@ -0,0 +1,86 @@ + +#include "logger.h" +#include +#include +#include +#include +#include + +#define WINDOW_SEC 1 +#define MAX_TIMES 1000 + +static FILE *csv_file = NULL; +static double msg_times[MAX_TIMES]; +static int msg_count = 0; + +static long long current_time_ms() { + struct timeval tv; + gettimeofday(&tv, NULL); + return ((long long)tv.tv_sec * 1000) + (tv.tv_usec / 1000); +} + +static double current_time_sec() { + struct timeval tv; + gettimeofday(&tv, NULL); + return (double)tv.tv_sec + (tv.tv_usec / 1e6); +} + +static int update_throughput_window(double now) { + int i, new_count = 0; + for (i = 0; i < msg_count; i++) { + if (now - msg_times[i] < WINDOW_SEC) { + msg_times[new_count++] = msg_times[i]; + } + } + msg_times[new_count++] = now; + msg_count = new_count; + return msg_count; +} + +void logger_init(const char *filename) { + csv_file = fopen(filename, "w"); + if (!csv_file) { + perror("Cannot open CSV file"); + exit(EXIT_FAILURE); + } + fprintf(csv_file, + "timestamp,sent_ms,received_ms,rtt_ms,throughput_msg_per_s\n"); + fflush(csv_file); +} + +void logger_cleanup() { + if (csv_file) { + fclose(csv_file); + csv_file = NULL; + } +} + +void logger_handle_message(const void *payload, int payloadlen) { + char *payload_str = malloc(payloadlen + 1); + if (!payload_str) + return; + memcpy(payload_str, payload, payloadlen); + payload_str[payloadlen] = '\0'; + + long long sent_ms = atoll(payload_str); + free(payload_str); + + long long received_ms = current_time_ms(); + long long rtt = received_ms - sent_ms; + + double now_sec = current_time_sec(); + int throughput = update_throughput_window(now_sec); + + time_t now = time(NULL); + struct tm *tm_info = localtime(&now); + char iso_time[32]; + strftime(iso_time, sizeof(iso_time), "%Y-%m-%dT%H:%M:%S", tm_info); + + if (csv_file) { + fprintf(csv_file, "%s,%lld,%lld,%lld,%d\n", iso_time, sent_ms, received_ms, + rtt, throughput); + fflush(csv_file); + } + + printf("RTT: %lld ms | Throughput: %d msg/s\n", rtt, throughput); +} diff --git a/analysis/rtt/src/logger.h b/analysis/rtt/src/logger.h new file mode 100644 index 0000000..f1602c9 --- /dev/null +++ b/analysis/rtt/src/logger.h @@ -0,0 +1,9 @@ + +#ifndef LOGGER_H +#define LOGGER_H + +void logger_init(const char *filename); +void logger_cleanup(); +void logger_handle_message(const void *payload, int payloadlen); + +#endif // LOGGER_H diff --git a/analysis/rtt/src/main.c b/analysis/rtt/src/main.c new file mode 100644 index 0000000..13f7716 --- /dev/null +++ b/analysis/rtt/src/main.c @@ -0,0 +1,28 @@ + +#include "logger.h" +#include "mqtt_client.h" +#include + +#define BROKER_ADDRESS "192.168.1.101" +#define BROKER_PORT 1883 +#define TOPIC "device/echo/in" +#define CSV_FILE "rtt_throughput_log.csv" + +int main() { + mqtt_client_t client; + + logger_init(CSV_FILE); + + if (mqtt_client_init(&client, BROKER_ADDRESS, BROKER_PORT, TOPIC) != 0) { + fprintf(stderr, "MQTT initialization error\n"); + return 1; + } + + printf("Connection established.Waiting for messages...\n"); + mqtt_client_loop(&client); + + logger_cleanup(); + mqtt_client_cleanup(&client); + + return 0; +} diff --git a/analysis/rtt/src/mqtt_client.c b/analysis/rtt/src/mqtt_client.c new file mode 100644 index 0000000..43a06d8 --- /dev/null +++ b/analysis/rtt/src/mqtt_client.c @@ -0,0 +1,48 @@ +#include "mqtt_client.h" +#include "logger.h" +#include +#include + +#define WINDOW_SEC 1 + +static void on_message(struct mosquitto *mosq, void *userdata, + const struct mosquitto_message *msg); + +int mqtt_client_init(mqtt_client_t *client, const char *broker_address, + int port, const char *topic) { + mosquitto_lib_init(); + client->mosq = mosquitto_new(NULL, true, NULL); + if (!client->mosq) { + fprintf(stderr, "Cannot create MQTT client\n"); + return 1; + } + + mosquitto_message_callback_set(client->mosq, on_message); + + if (mosquitto_connect(client->mosq, broker_address, port, 60) != + MOSQ_ERR_SUCCESS) { + fprintf(stderr, "Nie można połączyć się z brokerem MQTT\n"); + return 1; + } + + if (mosquitto_subscribe(client->mosq, NULL, topic, 0) != MOSQ_ERR_SUCCESS) { + fprintf(stderr, "Cannot subscribe to topic\n"); + return 1; + } + + return 0; +} + +void mqtt_client_cleanup(mqtt_client_t *client) { + mosquitto_destroy(client->mosq); + mosquitto_lib_cleanup(); +} + +void mqtt_client_loop(mqtt_client_t *client) { + mosquitto_loop_forever(client->mosq, -1, 1); +} + +static void on_message(struct mosquitto *mosq, void *userdata, + const struct mosquitto_message *msg) { + logger_handle_message(msg->payload, msg->payloadlen); +} diff --git a/analysis/rtt/src/mqtt_client.h b/analysis/rtt/src/mqtt_client.h new file mode 100644 index 0000000..781e742 --- /dev/null +++ b/analysis/rtt/src/mqtt_client.h @@ -0,0 +1,16 @@ + +#ifndef MQTT_CLIENT_H +#define MQTT_CLIENT_H + +#include + +typedef struct { + struct mosquitto *mosq; +} mqtt_client_t; + +int mqtt_client_init(mqtt_client_t *client, const char *broker_address, + int port, const char *topic); +void mqtt_client_cleanup(mqtt_client_t *client); +void mqtt_client_loop(mqtt_client_t *client); + +#endif // MQTT_CLIENT_H diff --git a/analysis/rtt/src/mqtt_rtt_logger.c b/analysis/rtt/src/mqtt_rtt_logger.c new file mode 100644 index 0000000..60e5d2d --- /dev/null +++ b/analysis/rtt/src/mqtt_rtt_logger.c @@ -0,0 +1,117 @@ +// #include +// #include +// #include +// #include +// #include +// #include +// #include +// +// #define BROKER_ADDRESS "192.168.1.101" +// #define BROKER_PORT 1883 +// #define TOPIC "device/echo/in" +// #define CSV_FILE "rtt_throughput_log.csv" +// #define WINDOW_SEC 1 +// #define MAX_TIMES 1000 +// +// // Kolejka czasów przyjścia wiadomości (dla throughput) +// double msg_times[MAX_TIMES]; +// int msg_count = 0; +// +// // Zwraca aktualny czas w milisekundach +// long long current_time_ms() { +// struct timeval tv; +// gettimeofday(&tv, NULL); +// return ((long long)tv.tv_sec * 1000) + (tv.tv_usec / 1000); +// } +// +// // Zwraca aktualny czas w sekundach z ułamkiem +// double current_time_sec() { +// struct timeval tv; +// gettimeofday(&tv, NULL); +// return (double)tv.tv_sec + (tv.tv_usec / 1e6); +// } +// +// // Aktualizuje throughput window i zwraca liczbę wiadomości w oknie +// int update_throughput_window(double now) { +// int i, new_count = 0; +// for (i = 0; i < msg_count; i++) { +// if (now - msg_times[i] < WINDOW_SEC) { +// msg_times[new_count++] = msg_times[i]; +// } +// } +// msg_times[new_count++] = now; +// msg_count = new_count; +// return msg_count; +// } +// +// // Callback po odebraniu wiadomości +// void on_message(struct mosquitto *mosq, void *userdata, +// const struct mosquitto_message *msg) { +// char *payload = (char *)msg->payload; +// long long sent_ms = atoll(payload); +// long long received_ms = current_time_ms(); +// long long rtt = received_ms - sent_ms; +// +// double now_sec = current_time_sec(); +// int throughput = update_throughput_window(now_sec); +// +// // Timestamp ISO 8601 +// time_t now = time(NULL); +// struct tm *tm_info = localtime(&now); +// char iso_time[32]; +// strftime(iso_time, sizeof(iso_time), "%Y-%m-%dT%H:%M:%S", tm_info); +// +// // Zapisz do CSV +// FILE *f = fopen(CSV_FILE, "a"); +// if (f) { +// fprintf(f, "%s,%lld,%lld,%lld,%d\n", iso_time, sent_ms, received_ms, rtt, +// throughput); +// fclose(f); +// } +// +// // Wydruk +// printf("RTT: %lld ms | Throughput: %d msg/s\n", rtt, throughput); +// } +// +// int main() { +// printf("Start programu\n"); +// +// FILE *f = fopen(CSV_FILE, "w"); +// if (!f) { +// perror("Nie można otworzyć pliku CSV"); +// return 1; +// } +// fprintf(f, "timestamp,sent_ms,received_ms,rtt_ms,throughput_msg_per_s\n"); +// fclose(f); +// printf("Plik CSV utworzony\n"); +// +// mosquitto_lib_init(); +// struct mosquitto *mosq = mosquitto_new(NULL, true, NULL); +// if (!mosq) { +// fprintf(stderr, "Błąd tworzenia klienta MQTT\n"); +// return 1; +// } +// printf("Klient MQTT utworzony\n"); +// +// mosquitto_message_callback_set(mosq, on_message); +// +// int rc = mosquitto_connect(mosq, BROKER_ADDRESS, BROKER_PORT, 60); +// if (rc != MOSQ_ERR_SUCCESS) { +// fprintf(stderr, "Nie można połączyć się z brokerem MQTT, kod błędu: +// %d\n", +// rc); +// return 1; +// } +// printf("Połączono z brokerem MQTT\n"); +// +// mosquitto_subscribe(mosq, NULL, TOPIC, 0); +// printf("Subskrypcja tematu %s ustawiona\n", TOPIC); +// +// printf("Oczekiwanie na wiadomości...\n"); +// mosquitto_loop_forever(mosq, -1, 1); +// +// mosquitto_destroy(mosq); +// mosquitto_lib_cleanup(); +// +// return 0; +// } -- cgit v1.2.3