diff options
Diffstat (limited to 'firmware/src/mqtt.c')
| -rw-r--r-- | firmware/src/mqtt.c | 143 |
1 files changed, 126 insertions, 17 deletions
diff --git a/firmware/src/mqtt.c b/firmware/src/mqtt.c index b171437..b3553eb 100644 --- a/firmware/src/mqtt.c +++ b/firmware/src/mqtt.c | |||
| @@ -11,9 +11,24 @@ | |||
| 11 | static const char *TAG = "mqtt"; | 11 | static const char *TAG = "mqtt"; |
| 12 | static esp_mqtt_client_handle_t client; | 12 | static esp_mqtt_client_handle_t client; |
| 13 | 13 | ||
| 14 | /** | 14 | // Zliczniki i pomiary |
| 15 | * @brief Obsługa zdarzeń MQTT | 15 | static long long last_sent = 0; |
| 16 | */ | 16 | static long long last_rtt = 0; |
| 17 | static long long last_rtt_prev = 0; | ||
| 18 | static int sent_count = 0; | ||
| 19 | static int received_count = 0; | ||
| 20 | static unsigned long tx_bytes = 0; | ||
| 21 | static unsigned long rx_bytes = 0; | ||
| 22 | |||
| 23 | // Kanały pomiarów | ||
| 24 | #define TOPIC_RTT "device/metrics/rtt" | ||
| 25 | #define TOPIC_JITTER "device/metrics/jitter" | ||
| 26 | #define TOPIC_LOSS "device/metrics/loss" | ||
| 27 | #define TOPIC_THROUGHPUT "device/metrics/throughput" | ||
| 28 | #define TOPIC_CPU "device/metrics/cpu" | ||
| 29 | #define TOPIC_NET_TX "device/metrics/net_tx" | ||
| 30 | #define TOPIC_NET_RX "device/metrics/net_rx" | ||
| 31 | |||
| 17 | static void mqtt_event_handler(void *handler_args, esp_event_base_t base, | 32 | static void mqtt_event_handler(void *handler_args, esp_event_base_t base, |
| 18 | int32_t event_id, void *event_data) | 33 | int32_t event_id, void *event_data) |
| 19 | { | 34 | { |
| @@ -22,7 +37,7 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, | |||
| 22 | switch (event->event_id) { | 37 | switch (event->event_id) { |
| 23 | case MQTT_EVENT_CONNECTED: | 38 | case MQTT_EVENT_CONNECTED: |
| 24 | ESP_LOGI(TAG, "MQTT connected!"); | 39 | ESP_LOGI(TAG, "MQTT connected!"); |
| 25 | esp_mqtt_client_subscribe(event->client, SUB_TOPIC, 1); | 40 | esp_mqtt_client_subscribe(event->client, PUB_TOPIC, 1); |
| 26 | break; | 41 | break; |
| 27 | 42 | ||
| 28 | case MQTT_EVENT_DATA: { | 43 | case MQTT_EVENT_DATA: { |
| @@ -34,21 +49,23 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, | |||
| 34 | memcpy(data, event->data, event->data_len); | 49 | memcpy(data, event->data, event->data_len); |
| 35 | data[event->data_len] = '\0'; | 50 | data[event->data_len] = '\0'; |
| 36 | 51 | ||
| 37 | ESP_LOGI(TAG, "MQTT Msg received | Topic: %s | Data: %s", topic, | 52 | rx_bytes += event->data_len; |
| 38 | data); | ||
| 39 | 53 | ||
| 40 | // Echo wiadomości do PUB_TOPIC tylko jeśli timestamp jest | 54 | long long now = esp_timer_get_time() / 1000ULL; |
| 41 | // sensowny | ||
| 42 | char *endptr; | 55 | char *endptr; |
| 43 | long long ts = strtoll(data, &endptr, 10); | 56 | long long ts = strtoll(data, &endptr, 10); |
| 57 | |||
| 44 | if (endptr != data && *endptr == '\0' && ts > 0 && ts < 1e13) { | 58 | if (endptr != data && *endptr == '\0' && ts > 0 && ts < 1e13) { |
| 45 | esp_mqtt_client_publish(event->client, PUB_TOPIC, data, | 59 | long long rtt = now - ts; |
| 46 | 0, 1, 0); | 60 | if (rtt >= 0 && rtt < 10000) { |
| 47 | } else { | 61 | last_rtt_prev = last_rtt; |
| 48 | ESP_LOGW(TAG, "Nieprawidłowe dane, ignorowane: %s", | 62 | last_rtt = rtt; |
| 49 | data); | 63 | received_count++; |
| 64 | esp_mqtt_client_publish( | ||
| 65 | event->client, SUB_TOPIC, data, 0, 1, 0); | ||
| 66 | tx_bytes += strlen(data); | ||
| 67 | } | ||
| 50 | } | 68 | } |
| 51 | |||
| 52 | break; | 69 | break; |
| 53 | } | 70 | } |
| 54 | 71 | ||
| @@ -65,9 +82,96 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, | |||
| 65 | } | 82 | } |
| 66 | } | 83 | } |
| 67 | 84 | ||
| 68 | /** | 85 | static void mqtt_test_task(void *pvParameters) |
| 69 | * @brief Uruchamia klienta MQTT | 86 | { |
| 70 | */ | 87 | int freq = *((int *)pvParameters); |
| 88 | int delay_ms = 1000 / freq; | ||
| 89 | |||
| 90 | while (1) { | ||
| 91 | last_sent = esp_timer_get_time() / 1000ULL; | ||
| 92 | char msg[32]; | ||
| 93 | snprintf(msg, sizeof(msg), "%lld", last_sent); | ||
| 94 | esp_mqtt_client_publish(client, PUB_TOPIC, msg, 0, 1, 0); | ||
| 95 | tx_bytes += strlen(msg); | ||
| 96 | sent_count++; | ||
| 97 | vTaskDelay(pdMS_TO_TICKS(delay_ms)); | ||
| 98 | } | ||
| 99 | } | ||
| 100 | |||
| 101 | static void mqtt_metrics_task(void *pvParameters) | ||
| 102 | { | ||
| 103 | long long last_time = esp_timer_get_time(); | ||
| 104 | while (1) { | ||
| 105 | long long now = esp_timer_get_time(); | ||
| 106 | long long dt = (now - last_time) / 1000ULL; | ||
| 107 | last_time = now; | ||
| 108 | |||
| 109 | // RTT | ||
| 110 | if (last_rtt > 0) { | ||
| 111 | char buf[32]; | ||
| 112 | snprintf(buf, sizeof(buf), "%lld", last_rtt); | ||
| 113 | esp_mqtt_client_publish(client, TOPIC_RTT, buf, 0, 1, | ||
| 114 | 0); | ||
| 115 | tx_bytes += strlen(buf); | ||
| 116 | } | ||
| 117 | |||
| 118 | // Jitter | ||
| 119 | if (last_rtt_prev > 0) { | ||
| 120 | long long jitter = llabs(last_rtt - last_rtt_prev); | ||
| 121 | char buf[32]; | ||
| 122 | snprintf(buf, sizeof(buf), "%lld", jitter); | ||
| 123 | esp_mqtt_client_publish(client, TOPIC_JITTER, buf, 0, 1, | ||
| 124 | 0); | ||
| 125 | tx_bytes += strlen(buf); | ||
| 126 | } | ||
| 127 | |||
| 128 | // Packet loss | ||
| 129 | int loss = sent_count > 0 ? (sent_count - received_count) * | ||
| 130 | 100 / sent_count | ||
| 131 | : 0; | ||
| 132 | char buf_loss[16]; | ||
| 133 | snprintf(buf_loss, sizeof(buf_loss), "%d", loss); | ||
| 134 | esp_mqtt_client_publish(client, TOPIC_LOSS, buf_loss, 0, 1, 0); | ||
| 135 | tx_bytes += strlen(buf_loss); | ||
| 136 | |||
| 137 | // Throughput | ||
| 138 | char buf_thr[16]; | ||
| 139 | snprintf(buf_thr, sizeof(buf_thr), "%d", received_count); | ||
| 140 | esp_mqtt_client_publish(client, TOPIC_THROUGHPUT, buf_thr, 0, 1, | ||
| 141 | 0); | ||
| 142 | tx_bytes += strlen(buf_thr); | ||
| 143 | |||
| 144 | // CPU usage | ||
| 145 | UBaseType_t free_heap = xPortGetFreeHeapSize(); | ||
| 146 | UBaseType_t min_free_heap = xPortGetMinimumEverFreeHeapSize(); | ||
| 147 | int cpu_load = | ||
| 148 | (int)((1.0 - ((float)free_heap / (float)min_free_heap)) * | ||
| 149 | 100.0); | ||
| 150 | if (cpu_load < 0) | ||
| 151 | cpu_load = 0; | ||
| 152 | if (cpu_load > 100) | ||
| 153 | cpu_load = 100; | ||
| 154 | char buf_cpu[16]; | ||
| 155 | snprintf(buf_cpu, sizeof(buf_cpu), "%d", cpu_load); | ||
| 156 | esp_mqtt_client_publish(client, TOPIC_CPU, buf_cpu, 0, 1, 0); | ||
| 157 | tx_bytes += strlen(buf_cpu); | ||
| 158 | |||
| 159 | // Network TX/RX bytes | ||
| 160 | char buf_tx[32], buf_rx[32]; | ||
| 161 | snprintf(buf_tx, sizeof(buf_tx), "%lu", tx_bytes); | ||
| 162 | snprintf(buf_rx, sizeof(buf_rx), "%lu", rx_bytes); | ||
| 163 | esp_mqtt_client_publish(client, TOPIC_NET_TX, buf_tx, 0, 1, 0); | ||
| 164 | esp_mqtt_client_publish(client, TOPIC_NET_RX, buf_rx, 0, 1, 0); | ||
| 165 | |||
| 166 | // Reset liczniki na kolejny interwał | ||
| 167 | tx_bytes = 0; | ||
| 168 | rx_bytes = 0; | ||
| 169 | received_count = 0; | ||
| 170 | |||
| 171 | vTaskDelay(pdMS_TO_TICKS(1000)); | ||
| 172 | } | ||
| 173 | } | ||
| 174 | |||
| 71 | void mqtt_app_start(void) | 175 | void mqtt_app_start(void) |
| 72 | { | 176 | { |
| 73 | esp_mqtt_client_config_t mqtt_cfg = { | 177 | esp_mqtt_client_config_t mqtt_cfg = { |
| @@ -80,4 +184,9 @@ void mqtt_app_start(void) | |||
| 80 | esp_mqtt_client_start(client); | 184 | esp_mqtt_client_start(client); |
| 81 | 185 | ||
| 82 | ESP_LOGI(TAG, "MQTT client started"); | 186 | ESP_LOGI(TAG, "MQTT client started"); |
| 187 | |||
| 188 | static int freq = 20; | ||
| 189 | xTaskCreate(mqtt_test_task, "mqtt_test_task", 4096, &freq, 5, NULL); | ||
| 190 | xTaskCreate(mqtt_metrics_task, "mqtt_metrics_task", 4096, NULL, 5, | ||
| 191 | NULL); | ||
| 83 | } | 192 | } |
