From 4698910f842c322b80ebd1cf19a4e32c0fb2249d Mon Sep 17 00:00:00 2001 From: Filip Wandzio Date: Wed, 15 Oct 2025 01:50:43 +0200 Subject: Implement additional metrics --- firmware/src/mqtt.c | 143 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 126 insertions(+), 17 deletions(-) diff --git a/firmware/src/mqtt.c b/firmware/src/mqtt.c index b171437..b3553eb 100644 --- a/firmware/src/mqtt.c +++ b/firmware/src/mqtt.c @@ -11,9 +11,24 @@ static const char *TAG = "mqtt"; static esp_mqtt_client_handle_t client; -/** - * @brief Obsługa zdarzeń MQTT - */ +// Zliczniki i pomiary +static long long last_sent = 0; +static long long last_rtt = 0; +static long long last_rtt_prev = 0; +static int sent_count = 0; +static int received_count = 0; +static unsigned long tx_bytes = 0; +static unsigned long rx_bytes = 0; + +// Kanały pomiarów +#define TOPIC_RTT "device/metrics/rtt" +#define TOPIC_JITTER "device/metrics/jitter" +#define TOPIC_LOSS "device/metrics/loss" +#define TOPIC_THROUGHPUT "device/metrics/throughput" +#define TOPIC_CPU "device/metrics/cpu" +#define TOPIC_NET_TX "device/metrics/net_tx" +#define TOPIC_NET_RX "device/metrics/net_rx" + static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { @@ -22,7 +37,7 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, switch (event->event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT connected!"); - esp_mqtt_client_subscribe(event->client, SUB_TOPIC, 1); + esp_mqtt_client_subscribe(event->client, PUB_TOPIC, 1); break; case MQTT_EVENT_DATA: { @@ -34,21 +49,23 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, memcpy(data, event->data, event->data_len); data[event->data_len] = '\0'; - ESP_LOGI(TAG, "MQTT Msg received | Topic: %s | Data: %s", topic, - data); + rx_bytes += event->data_len; - // Echo wiadomości do PUB_TOPIC tylko jeśli timestamp jest - // sensowny + long long now = esp_timer_get_time() / 1000ULL; char *endptr; long long ts = strtoll(data, &endptr, 10); + if (endptr != data && *endptr == '\0' && ts > 0 && ts < 1e13) { - esp_mqtt_client_publish(event->client, PUB_TOPIC, data, - 0, 1, 0); - } else { - ESP_LOGW(TAG, "Nieprawidłowe dane, ignorowane: %s", - data); + long long rtt = now - ts; + if (rtt >= 0 && rtt < 10000) { + last_rtt_prev = last_rtt; + last_rtt = rtt; + received_count++; + esp_mqtt_client_publish( + event->client, SUB_TOPIC, data, 0, 1, 0); + tx_bytes += strlen(data); + } } - break; } @@ -65,9 +82,96 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, } } -/** - * @brief Uruchamia klienta MQTT - */ +static void mqtt_test_task(void *pvParameters) +{ + int freq = *((int *)pvParameters); + int delay_ms = 1000 / freq; + + while (1) { + last_sent = esp_timer_get_time() / 1000ULL; + char msg[32]; + snprintf(msg, sizeof(msg), "%lld", last_sent); + esp_mqtt_client_publish(client, PUB_TOPIC, msg, 0, 1, 0); + tx_bytes += strlen(msg); + sent_count++; + vTaskDelay(pdMS_TO_TICKS(delay_ms)); + } +} + +static void mqtt_metrics_task(void *pvParameters) +{ + long long last_time = esp_timer_get_time(); + while (1) { + long long now = esp_timer_get_time(); + long long dt = (now - last_time) / 1000ULL; + last_time = now; + + // RTT + if (last_rtt > 0) { + char buf[32]; + snprintf(buf, sizeof(buf), "%lld", last_rtt); + esp_mqtt_client_publish(client, TOPIC_RTT, buf, 0, 1, + 0); + tx_bytes += strlen(buf); + } + + // Jitter + if (last_rtt_prev > 0) { + long long jitter = llabs(last_rtt - last_rtt_prev); + char buf[32]; + snprintf(buf, sizeof(buf), "%lld", jitter); + esp_mqtt_client_publish(client, TOPIC_JITTER, buf, 0, 1, + 0); + tx_bytes += strlen(buf); + } + + // Packet loss + int loss = sent_count > 0 ? (sent_count - received_count) * + 100 / sent_count + : 0; + char buf_loss[16]; + snprintf(buf_loss, sizeof(buf_loss), "%d", loss); + esp_mqtt_client_publish(client, TOPIC_LOSS, buf_loss, 0, 1, 0); + tx_bytes += strlen(buf_loss); + + // Throughput + char buf_thr[16]; + snprintf(buf_thr, sizeof(buf_thr), "%d", received_count); + esp_mqtt_client_publish(client, TOPIC_THROUGHPUT, buf_thr, 0, 1, + 0); + tx_bytes += strlen(buf_thr); + + // CPU usage + UBaseType_t free_heap = xPortGetFreeHeapSize(); + UBaseType_t min_free_heap = xPortGetMinimumEverFreeHeapSize(); + int cpu_load = + (int)((1.0 - ((float)free_heap / (float)min_free_heap)) * + 100.0); + if (cpu_load < 0) + cpu_load = 0; + if (cpu_load > 100) + cpu_load = 100; + char buf_cpu[16]; + snprintf(buf_cpu, sizeof(buf_cpu), "%d", cpu_load); + esp_mqtt_client_publish(client, TOPIC_CPU, buf_cpu, 0, 1, 0); + tx_bytes += strlen(buf_cpu); + + // Network TX/RX bytes + char buf_tx[32], buf_rx[32]; + snprintf(buf_tx, sizeof(buf_tx), "%lu", tx_bytes); + snprintf(buf_rx, sizeof(buf_rx), "%lu", rx_bytes); + esp_mqtt_client_publish(client, TOPIC_NET_TX, buf_tx, 0, 1, 0); + esp_mqtt_client_publish(client, TOPIC_NET_RX, buf_rx, 0, 1, 0); + + // Reset liczniki na kolejny interwał + tx_bytes = 0; + rx_bytes = 0; + received_count = 0; + + vTaskDelay(pdMS_TO_TICKS(1000)); + } +} + void mqtt_app_start(void) { esp_mqtt_client_config_t mqtt_cfg = { @@ -80,4 +184,9 @@ void mqtt_app_start(void) esp_mqtt_client_start(client); ESP_LOGI(TAG, "MQTT client started"); + + static int freq = 20; + xTaskCreate(mqtt_test_task, "mqtt_test_task", 4096, &freq, 5, NULL); + xTaskCreate(mqtt_metrics_task, "mqtt_metrics_task", 4096, NULL, 5, + NULL); } -- cgit v1.2.3