aboutsummaryrefslogtreecommitdiffstats
path: root/analysis/e1anl/src/agregate.py
diff options
context:
space:
mode:
authorFilip Wandzio <contact@philw.dev>2025-09-04 22:25:39 +0200
committerFilip Wandzio <contact@philw.dev>2025-09-04 22:25:39 +0200
commit1ba21da6cbc63c0c549fb92731e25bedc482eb51 (patch)
treeddf6fc2259a2495f8de336a07873cc3c6796785e /analysis/e1anl/src/agregate.py
parente00f3a9ede1b8e46b480bd68daf48da0bb08acae (diff)
downloade1-1ba21da6cbc63c0c549fb92731e25bedc482eb51.tar.gz
e1-1ba21da6cbc63c0c549fb92731e25bedc482eb51.zip
Unify the directory, add new analysis methods, unify the code style
Signed-off-by: Filip Wandzio <contact@philw.dev>
Diffstat (limited to 'analysis/e1anl/src/agregate.py')
-rw-r--r--analysis/e1anl/src/agregate.py71
1 files changed, 71 insertions, 0 deletions
diff --git a/analysis/e1anl/src/agregate.py b/analysis/e1anl/src/agregate.py
new file mode 100644
index 0000000..1054288
--- /dev/null
+++ b/analysis/e1anl/src/agregate.py
@@ -0,0 +1,71 @@
1# mqtt_receiver.py
2
3import paho.mqtt.client as mqtt
4import csv
5import time
6from datetime import datetime
7from collections import deque
8import os
9os.makedirs("output", exist_ok=True)
10
11BROKER = "192.168.1.101"
12SUB_TOPIC = "device/echo/in"
13CSV_FILE = "output/rtt_throughput_log.csv"
14WINDOW_SEC = 1
15
16msg_times = deque()
17should_stop = False # flag to stop loop externally
18
19
20def on_connect(client, userdata, flags, rc):
21 if rc == 0:
22 print("Connected to broker")
23 client.subscribe(SUB_TOPIC)
24 else:
25 print("Failed to connect, return code:", rc)
26
27
28def on_message(client, userdata, msg):
29 try:
30 payload_str = msg.payload.decode()
31 sent_ms = int(payload_str)
32 received_ms = int(time.time() * 1000)
33 rtt = received_ms - sent_ms
34 timestamp = datetime.now().isoformat()
35
36 now_sec = time.time()
37 msg_times.append(now_sec)
38 while msg_times and msg_times[0] < now_sec - WINDOW_SEC:
39 msg_times.popleft()
40 throughput = len(msg_times) / WINDOW_SEC
41
42 with open(CSV_FILE, "a", newline="") as f:
43 writer = csv.writer(f)
44 writer.writerow([timestamp, sent_ms, received_ms, rtt, throughput])
45
46 except Exception as e:
47 print("Error processing message:", e)
48
49
50def collect_data(duration_sec):
51 global should_stop
52 should_stop = False
53
54 # Start CSV log
55 with open(CSV_FILE, "w", newline="") as f:
56 writer = csv.writer(f)
57 writer.writerow(["timestamp", "sent_ms", "received_ms", "rtt_ms", "throughput_msg_per_s"])
58
59 client = mqtt.Client()
60 client.on_connect = on_connect
61 client.on_message = on_message
62 client.connect(BROKER, 1883, 60)
63
64 client.loop_start()
65 print(f"Zbieranie danych przez {duration_sec / 60:.1f} minut...")
66
67 time.sleep(duration_sec)
68
69 print("Zbieranie danych zakończone.")
70 client.loop_stop()
71 client.disconnect()