From 1ba21da6cbc63c0c549fb92731e25bedc482eb51 Mon Sep 17 00:00:00 2001 From: Filip Wandzio Date: Thu, 4 Sep 2025 22:25:39 +0200 Subject: Unify the directory, add new analysis methods, unify the code style Signed-off-by: Filip Wandzio --- analysis/e1anl/src/agregate.py | 71 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 analysis/e1anl/src/agregate.py (limited to 'analysis/e1anl/src/agregate.py') diff --git a/analysis/e1anl/src/agregate.py b/analysis/e1anl/src/agregate.py new file mode 100644 index 0000000..1054288 --- /dev/null +++ b/analysis/e1anl/src/agregate.py @@ -0,0 +1,71 @@ +# mqtt_receiver.py + +import paho.mqtt.client as mqtt +import csv +import time +from datetime import datetime +from collections import deque +import os +os.makedirs("output", exist_ok=True) + +BROKER = "192.168.1.101" +SUB_TOPIC = "device/echo/in" +CSV_FILE = "output/rtt_throughput_log.csv" +WINDOW_SEC = 1 + +msg_times = deque() +should_stop = False # flag to stop loop externally + + +def on_connect(client, userdata, flags, rc): + if rc == 0: + print("Connected to broker") + client.subscribe(SUB_TOPIC) + else: + print("Failed to connect, return code:", rc) + + +def on_message(client, userdata, msg): + try: + payload_str = msg.payload.decode() + sent_ms = int(payload_str) + received_ms = int(time.time() * 1000) + rtt = received_ms - sent_ms + timestamp = datetime.now().isoformat() + + now_sec = time.time() + msg_times.append(now_sec) + while msg_times and msg_times[0] < now_sec - WINDOW_SEC: + msg_times.popleft() + throughput = len(msg_times) / WINDOW_SEC + + with open(CSV_FILE, "a", newline="") as f: + writer = csv.writer(f) + writer.writerow([timestamp, sent_ms, received_ms, rtt, throughput]) + + except Exception as e: + print("Error processing message:", e) + + +def collect_data(duration_sec): + global should_stop + should_stop = False + + # Start CSV log + with open(CSV_FILE, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["timestamp", "sent_ms", "received_ms", "rtt_ms", "throughput_msg_per_s"]) + + client = mqtt.Client() + client.on_connect = on_connect + client.on_message = on_message + client.connect(BROKER, 1883, 60) + + client.loop_start() + print(f"Zbieranie danych przez {duration_sec / 60:.1f} minut...") + + time.sleep(duration_sec) + + print("Zbieranie danych zakończone.") + client.loop_stop() + client.disconnect() -- cgit v1.2.3