diff options
Diffstat (limited to 'analysis/e1anl/src/agregate.py')
| -rw-r--r-- | analysis/e1anl/src/agregate.py | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/analysis/e1anl/src/agregate.py b/analysis/e1anl/src/agregate.py new file mode 100644 index 0000000..1054288 --- /dev/null +++ b/analysis/e1anl/src/agregate.py | |||
| @@ -0,0 +1,71 @@ | |||
| 1 | # mqtt_receiver.py | ||
| 2 | |||
| 3 | import paho.mqtt.client as mqtt | ||
| 4 | import csv | ||
| 5 | import time | ||
| 6 | from datetime import datetime | ||
| 7 | from collections import deque | ||
| 8 | import os | ||
| 9 | os.makedirs("output", exist_ok=True) | ||
| 10 | |||
| 11 | BROKER = "192.168.1.101" | ||
| 12 | SUB_TOPIC = "device/echo/in" | ||
| 13 | CSV_FILE = "output/rtt_throughput_log.csv" | ||
| 14 | WINDOW_SEC = 1 | ||
| 15 | |||
| 16 | msg_times = deque() | ||
| 17 | should_stop = False # flag to stop loop externally | ||
| 18 | |||
| 19 | |||
| 20 | def on_connect(client, userdata, flags, rc): | ||
| 21 | if rc == 0: | ||
| 22 | print("Connected to broker") | ||
| 23 | client.subscribe(SUB_TOPIC) | ||
| 24 | else: | ||
| 25 | print("Failed to connect, return code:", rc) | ||
| 26 | |||
| 27 | |||
| 28 | def on_message(client, userdata, msg): | ||
| 29 | try: | ||
| 30 | payload_str = msg.payload.decode() | ||
| 31 | sent_ms = int(payload_str) | ||
| 32 | received_ms = int(time.time() * 1000) | ||
| 33 | rtt = received_ms - sent_ms | ||
| 34 | timestamp = datetime.now().isoformat() | ||
| 35 | |||
| 36 | now_sec = time.time() | ||
| 37 | msg_times.append(now_sec) | ||
| 38 | while msg_times and msg_times[0] < now_sec - WINDOW_SEC: | ||
| 39 | msg_times.popleft() | ||
| 40 | throughput = len(msg_times) / WINDOW_SEC | ||
| 41 | |||
| 42 | with open(CSV_FILE, "a", newline="") as f: | ||
| 43 | writer = csv.writer(f) | ||
| 44 | writer.writerow([timestamp, sent_ms, received_ms, rtt, throughput]) | ||
| 45 | |||
| 46 | except Exception as e: | ||
| 47 | print("Error processing message:", e) | ||
| 48 | |||
| 49 | |||
| 50 | def collect_data(duration_sec): | ||
| 51 | global should_stop | ||
| 52 | should_stop = False | ||
| 53 | |||
| 54 | # Start CSV log | ||
| 55 | with open(CSV_FILE, "w", newline="") as f: | ||
| 56 | writer = csv.writer(f) | ||
| 57 | writer.writerow(["timestamp", "sent_ms", "received_ms", "rtt_ms", "throughput_msg_per_s"]) | ||
| 58 | |||
| 59 | client = mqtt.Client() | ||
| 60 | client.on_connect = on_connect | ||
| 61 | client.on_message = on_message | ||
| 62 | client.connect(BROKER, 1883, 60) | ||
| 63 | |||
| 64 | client.loop_start() | ||
| 65 | print(f"Zbieranie danych przez {duration_sec / 60:.1f} minut...") | ||
| 66 | |||
| 67 | time.sleep(duration_sec) | ||
| 68 | |||
| 69 | print("Zbieranie danych zakończone.") | ||
| 70 | client.loop_stop() | ||
| 71 | client.disconnect() | ||
