# mqtt_receiver.py import paho.mqtt.client as mqtt import csv import time from datetime import datetime from collections import deque import os os.makedirs("output", exist_ok=True) BROKER = "192.168.1.103" SUB_TOPIC = "device/echo/in" CSV_FILE = "output/rtt_throughput_log.csv" WINDOW_SEC = 1 msg_times = deque() should_stop = False # flag to stop loop externally def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to broker") client.subscribe(SUB_TOPIC) else: print("Failed to connect, return code:", rc) def on_message(client, userdata, msg): try: payload_str = msg.payload.decode() sent_ms = int(payload_str) received_ms = int(time.time() * 1000) rtt = received_ms - sent_ms timestamp = datetime.now().isoformat() now_sec = time.time() msg_times.append(now_sec) while msg_times and msg_times[0] < now_sec - WINDOW_SEC: msg_times.popleft() throughput = len(msg_times) / WINDOW_SEC with open(CSV_FILE, "a", newline="") as f: writer = csv.writer(f) writer.writerow([timestamp, sent_ms, received_ms, rtt, throughput]) except Exception as e: print("Error processing message:", e) def collect_data(duration_sec): global should_stop should_stop = False # Start CSV log with open(CSV_FILE, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["timestamp", "sent_ms", "received_ms", "rtt_ms", "throughput_msg_per_s"]) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect(BROKER, 1883, 60) client.loop_start() print(f"Zbieranie danych przez {duration_sec / 60:.1f} minut...") time.sleep(duration_sec) print("Zbieranie danych zakończone.") client.loop_stop() client.disconnect()