#include "mqtt.h" #include "config.h" #include "esp_log.h" #include "esp_timer.h" #include "freertos/FreeRTOS.h" #include "freertos/task.h" #include #include #include static const char *TAG = "mqtt"; static esp_mqtt_client_handle_t client; // Zliczniki i pomiary static long long last_sent = 0; static long long last_rtt = 0; static long long last_rtt_prev = 0; static int sent_count = 0; static int received_count = 0; static unsigned long tx_bytes = 0; static unsigned long rx_bytes = 0; // Kanały pomiarów #define TOPIC_RTT "device/metrics/rtt" #define TOPIC_JITTER "device/metrics/jitter" #define TOPIC_LOSS "device/metrics/loss" #define TOPIC_THROUGHPUT "device/metrics/throughput" #define TOPIC_CPU "device/metrics/cpu" #define TOPIC_NET_TX "device/metrics/net_tx" #define TOPIC_NET_RX "device/metrics/net_rx" static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { esp_mqtt_event_handle_t event = event_data; switch (event->event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT connected!"); esp_mqtt_client_subscribe(event->client, PUB_TOPIC, 1); break; case MQTT_EVENT_DATA: { char topic[event->topic_len + 1]; char data[event->data_len + 1]; memcpy(topic, event->topic, event->topic_len); topic[event->topic_len] = '\0'; memcpy(data, event->data, event->data_len); data[event->data_len] = '\0'; rx_bytes += event->data_len; long long now = esp_timer_get_time() / 1000ULL; char *endptr; long long ts = strtoll(data, &endptr, 10); if (endptr != data && *endptr == '\0' && ts > 0 && ts < 1e13) { long long rtt = now - ts; if (rtt >= 0 && rtt < 10000) { last_rtt_prev = last_rtt; last_rtt = rtt; received_count++; esp_mqtt_client_publish( event->client, SUB_TOPIC, data, 0, 1, 0); tx_bytes += strlen(data); } } break; } case MQTT_EVENT_DISCONNECTED: ESP_LOGW(TAG, "MQTT disconnected"); break; case MQTT_EVENT_ERROR: ESP_LOGE(TAG, "MQTT error occurred"); break; default: break; } } static void mqtt_test_task(void *pvParameters) { int freq = *((int *)pvParameters); int delay_ms = 1000 / freq; while (1) { last_sent = esp_timer_get_time() / 1000ULL; char msg[32]; snprintf(msg, sizeof(msg), "%lld", last_sent); esp_mqtt_client_publish(client, PUB_TOPIC, msg, 0, 1, 0); tx_bytes += strlen(msg); sent_count++; vTaskDelay(pdMS_TO_TICKS(delay_ms)); } } static void mqtt_metrics_task(void *pvParameters) { long long last_time = esp_timer_get_time(); while (1) { long long now = esp_timer_get_time(); long long dt = (now - last_time) / 1000ULL; last_time = now; // RTT if (last_rtt > 0) { char buf[32]; snprintf(buf, sizeof(buf), "%lld", last_rtt); esp_mqtt_client_publish(client, TOPIC_RTT, buf, 0, 1, 0); tx_bytes += strlen(buf); } // Jitter if (last_rtt_prev > 0) { long long jitter = llabs(last_rtt - last_rtt_prev); char buf[32]; snprintf(buf, sizeof(buf), "%lld", jitter); esp_mqtt_client_publish(client, TOPIC_JITTER, buf, 0, 1, 0); tx_bytes += strlen(buf); } // Packet loss int loss = sent_count > 0 ? (sent_count - received_count) * 100 / sent_count : 0; char buf_loss[16]; snprintf(buf_loss, sizeof(buf_loss), "%d", loss); esp_mqtt_client_publish(client, TOPIC_LOSS, buf_loss, 0, 1, 0); tx_bytes += strlen(buf_loss); // Throughput char buf_thr[16]; snprintf(buf_thr, sizeof(buf_thr), "%d", received_count); esp_mqtt_client_publish(client, TOPIC_THROUGHPUT, buf_thr, 0, 1, 0); tx_bytes += strlen(buf_thr); // CPU usage UBaseType_t free_heap = xPortGetFreeHeapSize(); UBaseType_t min_free_heap = xPortGetMinimumEverFreeHeapSize(); int cpu_load = (int)((1.0 - ((float)free_heap / (float)min_free_heap)) * 100.0); if (cpu_load < 0) cpu_load = 0; if (cpu_load > 100) cpu_load = 100; char buf_cpu[16]; snprintf(buf_cpu, sizeof(buf_cpu), "%d", cpu_load); esp_mqtt_client_publish(client, TOPIC_CPU, buf_cpu, 0, 1, 0); tx_bytes += strlen(buf_cpu); // Network TX/RX bytes char buf_tx[32], buf_rx[32]; snprintf(buf_tx, sizeof(buf_tx), "%lu", tx_bytes); snprintf(buf_rx, sizeof(buf_rx), "%lu", rx_bytes); esp_mqtt_client_publish(client, TOPIC_NET_TX, buf_tx, 0, 1, 0); esp_mqtt_client_publish(client, TOPIC_NET_RX, buf_rx, 0, 1, 0); // Reset liczniki na kolejny interwał tx_bytes = 0; rx_bytes = 0; received_count = 0; vTaskDelay(pdMS_TO_TICKS(1000)); } } void mqtt_app_start(void) { esp_mqtt_client_config_t mqtt_cfg = { .broker.address.uri = MQTT_URI, }; client = esp_mqtt_client_init(&mqtt_cfg); esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL); esp_mqtt_client_start(client); ESP_LOGI(TAG, "MQTT client started"); static int freq = 20; xTaskCreate(mqtt_test_task, "mqtt_test_task", 4096, &freq, 5, NULL); xTaskCreate(mqtt_metrics_task, "mqtt_metrics_task", 4096, NULL, 5, NULL); }