Продолжаем работу с протоколами модели OSI и на данном уроке мы попытаемся создать клиент MQTT.
С протоколом MQTT (Message Queuing Telemetry Transport) мы знакомы из уроков по передаче данных этому и этому.
Мы знаем, что клиент может быть одновременно и издателем и подписчиком тем, поэтому мы и наш клиент научим и тому и другому.
Также у нас есть брокер MQTT на плате Raspberry PI, который мы создали здесь.
А здесь мы отдельно поработали с уровнями качества обслуживания.
Подключим к питанию плату Raspberry PI, тем самым запустив наш брокер Mosquitto
Также к порту USB компьютера подключим плату ESP32
Проект мы сделаем из проекта урока 32 с именем WIFI_STA_HTTP_SERVER_AJAX и дадим ему новое имя WIFI_STA_TCP_MQTT. TCP в имени проекта будет говорить о том, что MQTT мы заводим поверх TCP, а не через WebSocket.
Откроем наш проект в Espressif IDE и для начала переименуем файлы http.c и http.h в mqtt.c и mqtt.h с обновлением ссылок. Как это делать, мы знаем.
В файле CMakeLists.txt также обновим имя подключаемого модуля
set(COMPONENT_SRCS "main.c wifi.c mqtt.c")
Перейдём в файл mqtt.c и удалим из него полностью весь код и добавим только вот это
1 2 3 4 |
#include "mqtt.h" //------------------------------------------------------------- static const char *TAG = "mqtt"; //------------------------------------------------------------- |
Добавим функцию запуска клиента MQTT, пока с пустым телом
1 2 3 4 5 6 |
static const char *TAG = "mqtt"; //------------------------------------------------------------- void mqtt_start(void) { } //------------------------------------------------------------- |
Перейдём в заголовочный файл mqtt.h и удалим также там весь код, а добавим вот этот с учётом прототипа только что добавленной нами функции
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#ifndef MAIN_MQTT_H_ #define MAIN_MQTT_H_ //------------------------------------------------------------- #include <string.h> #include "freertos/FreeRTOS.h" #include "freertos/task.h" #include "freertos/event_groups.h" #include "esp_log.h" #include "mqtt_client.h" #include "lwip/sockets.h" #include "lwip/dns.h" #include "lwip/netdb.h" //------------------------------------------------------------- void mqtt_start(void); //------------------------------------------------------------- #endif /* MAIN_MQTT_H_ */ |
Перейдём в файл main.c и в функции app_main удалим код, отвечающий за ножки светодиода RGB
gpio_reset_pin(CONFIG_RED_GPIO);
gpio_set_direction(CONFIG_RED_GPIO, GPIO_MODE_OUTPUT);
gpio_set_level(CONFIG_RED_GPIO, 1);
gpio_reset_pin(CONFIG_GREEN_GPIO);
gpio_set_direction(CONFIG_GREEN_GPIO, GPIO_MODE_OUTPUT);
gpio_set_level(CONFIG_GREEN_GPIO, 1);
gpio_reset_pin(CONFIG_BLUE_GPIO);
gpio_set_direction(CONFIG_BLUE_GPIO, GPIO_MODE_OUTPUT);
gpio_set_level(CONFIG_BLUE_GPIO, 1);
SPIFFS мы сегодня задействовать не будем, поэтому удалим код его инициализации
ESP_LOGI(TAG, "Initializing SPIFFS");
esp_vfs_spiffs_conf_t conf = {
.base_path = "/spiffs",
.partition_label = NULL,
.max_files = 5,
.format_if_mount_failed = true
};
ret = esp_vfs_spiffs_register(&conf);
if (ret != ESP_OK) {
if (ret == ESP_FAIL) {
ESP_LOGE(TAG, "Failed to mount or format filesystem");
} else if (ret == ESP_ERR_NOT_FOUND) {
ESP_LOGE(TAG, "Failed to find SPIFFS partition");
} else {
ESP_LOGE(TAG, "Failed to initialize SPIFFS (%s)", esp_err_to_name(ret));
}
return;
}
size_t total = 0, used = 0;
ret = esp_spiffs_info(conf.partition_label, &total, &used);
if (ret != ESP_OK) {
ESP_LOGE(TAG, "Failed to get SPIFFS partition information (%s)", esp_err_to_name(ret));
} else {
ESP_LOGI(TAG, "Partition size: total: %d, used: %d", total, used);
}
Вызовем функцию запуска клиента MQTT
1 2 3 |
ESP_LOGI(TAG, "wifi_init_sta: %d", ret); mqtt_start(); |
Так как SPIFFS мы задействовать не будем, то из файла partitions.csv удалим строку с конфигурацией данного раздела
storage, data, spiffs, , 0xF0000,
В строке выше немного прибавим памяти на основной раздел, чтобы компилятор не ругался
factory, app, factory, 0x10000, 0x120000,
В файле wifi.c из функции on_wifi_disconnect удалим код остановки web-сервера
httpd_handle_t* server = (httpd_handle_t*) arg;
if (*server) {
ESP_LOGI(TAG, "Stopping webserver");
stop_webserver(*server);
*server = NULL;
}
Также из функции on_got_ip удалим код запуска этого сервера
httpd_handle_t* server = (httpd_handle_t*) arg;
if (*server == NULL) {
ESP_LOGI(TAG, "Starting webserver");
*server = start_webserver();
}
Из функции wifi_start удалим инициализацию указателя на web-сервер
static httpd_handle_t server = NULL;
Также в данной функции изменим последний параметр вот в этом вызове на NULL
ret = esp_event_handler_register(WIFI_EVENT, WIFI_EVENT_STA_DISCONNECTED, &on_wifi_disconnect, NULL);
И здесь тоже
ret = esp_event_handler_register(IP_EVENT, IP_EVENT_STA_GOT_IP, &on_got_ip, NULL);
В файле wifi.h подключение файла также нам будет не нужно
#include "mqtt.h"
Теперь немного поработаем с конфигуратором.
Переименуем для красоты меню
menu "TCP MQTT Configuration"
Так как светодиод RGB мы не используем, то удалим код настройки его ножек
config RED_GPIO
int "LED RED GPIO number"
range 0 48
default 21
help
RED GPIO.
config GREEN_GPIO
int "LED GREEN GPIO number"
range 0 48
default 22
help
GREEN GPIO.
config BLUE_GPIO
int "LED BLUE GPIO number"
range 0 48
default 23
help
BLUE GPIO.
Код настройки сервера также удалим
config SERVER_IP
string "SERVER IPV4 Address"
default "192.168.0.13"
help
SERVER IPV4 Address.
config SERVER_PORT
int "Server Port"
range 0 65535
default 3333
help
The remote port.
config CLIENT_PORT
int "Client Port"
range 0 65535
default 4444
help
The local port.
Вместо этого добавим пункт настройки адреса брокера MQTT
1 2 3 4 5 |
config BROKER_URL string "BROKER URL" default "mqtt://192.168.1.138" help BROKER URL. |
Запустим конфигуратор и заполним в разделе TCP MQTT Configuration адрес нашего брокера
В функции mqtt_start в файле mqtt.c объявим и инициализируем переменную типа структуры конфигурации клиента MQTT
1 2 3 4 5 6 7 |
void mqtt_start(void) { esp_mqtt_client_config_t mqtt_cfg = { .uri = CONFIG_BROKER_URL, .password = "123456", .username = "mosquitto", }; |
Произведём первичную инициализацию клиента, получив адрес на его структуру
1 2 3 4 |
.username = "mosquitto", }; esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); |
Объявим глобальную переменную для группы событий
1 2 |
static const char *TAG = "mqtt"; static EventGroupHandle_t mqtt_state_event_group; |
Вернёмся в функцию mqtt_start и создадим группу событий
1 2 3 |
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); mqtt_state_event_group = xEventGroupCreate(); |
Добавим функцию обратного вызова для обработки событий клиента MQTT, в котором в случае режима отладки выведем некоторую информацию из входных параметров
1 2 3 4 5 6 7 |
static EventGroupHandle_t mqtt_state_event_group; //------------------------------------------------------------- static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); } //------------------------------------------------------------- |
Заберём некоторые входные параметры в локальные переменные и объявим целочисленную переменную для работы с идентификатором сообщений
1 2 3 4 |
ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); esp_mqtt_event_handle_t event = event_data; esp_mqtt_client_handle_t client = event->client; int msg_id; |
Заготовим обработку некоторых вариантов событий клиента, какие именно, понятно из имён макросов
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
int msg_id; switch ((esp_mqtt_event_id_t)event_id) { case MQTT_EVENT_CONNECTED: break; case MQTT_EVENT_DISCONNECTED: break; case MQTT_EVENT_SUBSCRIBED: break; case MQTT_EVENT_UNSUBSCRIBED: break; case MQTT_EVENT_DATA: break; case MQTT_EVENT_ERROR: break; default: ESP_LOGI(TAG, "Other event id:%d", event->event_id); break; } |
В функции mqtt_start зарегистрируем наш обработчик и запустим клиент
1 2 3 4 |
mqtt_state_event_group = xEventGroupCreate(); esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL); esp_mqtt_client_start(client); |
Объявим несколько макросов для битов группы событий
1 2 3 4 5 6 7 |
static EventGroupHandle_t mqtt_state_event_group; //------------------------------------------------------------- #define MQTT_EVT_CONNECTED BIT0 #define MQTT_EVT_SUBSCRIBED BIT1 #define MQTT_EVT_DISCONNECTED BIT2 #define MQTT_EVT_UNSUBSCRIBED BIT3 //------------------------------------------------------------- |
Добавим функцию для вывода ошибки в терминал
1 2 3 4 5 6 7 8 9 |
#define MQTT_EVT_UNSUBSCRIBED BIT3 //------------------------------------------------------------- static void log_error_if_nonzero(const char *message, int error_code) { if (error_code != 0) { ESP_LOGE(TAG, "Last error %s: 0x%x", message, error_code); } } //------------------------------------------------------------- |
В соответствующей ветке обработчика mqtt_event_handler определим ошибку и выведем информацию о ней в терминал
1 2 3 4 5 6 7 8 |
case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err); log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err); log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno); ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno)); } |
Если у нас произойдём событие соединения с MQTT-сервером, то установим соответствующий бит в группе событий, а бит состояния разъединения сбросим
1 2 3 4 |
case MQTT_EVENT_CONNECTED: xEventGroupSetBits(mqtt_state_event_group, MQTT_EVT_CONNECTED); xEventGroupClearBits(mqtt_state_event_group, MQTT_EVT_DISCONNECTED); ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); |
Соберём код, прошьём контроллер и убедимся в терминале, что мы соединились с брокером
Также у нас произошло событие 7. Если посчитать в перечисляемом типе, то это вот что
MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */
Попробуем опубликовать сообщение
1 2 3 |
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); msg_id = esp_mqtt_client_publish(client, "house/s1", "data_1", 0, 1, 0); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); |
Первый, второй и третий параметр, думаю понятен.
Четвёртый параметр — длина сообщения. Если ноль, то определяется автоматически по передаваемой строке.
Пятый — это уровень качества обслуживания, попробуем выставить QoS1.
Шестой — флаг retain. мы его не используем, поэтому ноль.
Соберём код, прошьём контроллер.
Запустим MQTT Explorer, которым мы пользовались в уроке 2 по передаче данных.
Настройки соединения не трогаем.
Убедимся, что мы подписаны на данный топик
Уровень качества обслуживания оставляем максимальный. Соединимся с брокером с помощью кнопки Connect.
Перезагрузим контроллер и если также всё нормально, то мы получим наше сообщение в MQTT Explorer
Также мы видим, что сообщение пришло с уровнем качества обслуживания 1.
А в терминале мы видим, что сообщение от нас ушло и видим его идентификатор
Также мы видим, что у нас произошло событие с идентификатором 5. Посчитав в перечисляемом типе, мы определим, что это вот какое событие
MQTT_EVENT_PUBLISHED, /*!< published event, additional context: msg_id */
Это событие успешной публикации сообщения.
Именно об успешной, то есть брокер подтвердил доставку. Если мы установим уровень качества обслуживания 0 в публикации, то такого события мы не получим.
Поэтому в своих проектах для проверки доставки опубликованных сообщений мы можем использовать обработку данного события.
Теперь давайте подпишемся на какие-нибудь топики с разными уровнями качества обслуживания
1 2 3 4 5 6 7 |
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); msg_id = esp_mqtt_client_subscribe(client, "house2/room2/cmd0", 0); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); msg_id = esp_mqtt_client_subscribe(client, "house2/room2/cmd1", 1); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); msg_id = esp_mqtt_client_subscribe(client, "house2/room2/cmd2", 2); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); |
Мы используем имя группы house2, а не house, для того, чтобы не было перекрёстных посылок. Мы с таким встречались также в уроке по практике уровней качества обслуживания.
Обработаем событие подписки
1 2 3 4 |
case MQTT_EVENT_SUBSCRIBED: xEventGroupSetBits(mqtt_state_event_group, MQTT_EVT_SUBSCRIBED); xEventGroupClearBits(mqtt_state_event_group, MQTT_EVT_UNSUBSCRIBED); ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); |
Также заодно обработаем событие отписки
1 2 3 4 |
case MQTT_EVENT_UNSUBSCRIBED: xEventGroupSetBits(mqtt_state_event_group, MQTT_EVT_UNSUBSCRIBED); xEventGroupClearBits(mqtt_state_event_group, MQTT_EVT_SUBSCRIBED); ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); |
Обработаем событие потери соединения
1 2 3 4 |
case MQTT_EVENT_DISCONNECTED: xEventGroupSetBits(mqtt_state_event_group, MQTT_EVT_DISCONNECTED); xEventGroupClearBits(mqtt_state_event_group, MQTT_EVT_CONNECTED); ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); |
И обработаем ещё событие получения сообщения, где выведем в терминал топик и само сообщение
1 2 3 4 |
case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); printf("DATA=%.*s\r\n", event->data_len, event->data); |
Запустим WireShark, отфильтруемся по адресу нашей платы, соберём код, прошьём контроллер и увидим в терминале, что подписка на топики прошла успешно
Я думаю, что вы уже поняли, что WireShark мы запустим не на компьютере, а на Raspberry PI, так как клиенты между собой не взаимодействуют по сети, только через брокера.
Мы увидим наши пакеты с соединением и подпиской
Также посмотрим состав пакета с нашими подписками
Все подписки здесь с соответствующими уровнями качества обслуживания QoS.
Попробуем опубликовать что-нибудь в данные топики в MQTT Explorer. Упражняться с уровнями не будем и передадим все сообщения с уровнями, как и в подписках
Посмотрим, как наши сообщения пришли в нашу плату
Все сообщения получены.
Также посмотрим в WireShark, как отправлял брокер нам наши сообщения
Не заглядывая внутрь пакетов, можно и так понять, с каким уровнем сообщения отправились нашему клиенту, глядя на отсутствие и наличие подтверждений публикаций.
Давайте теперь попробуем автоматизировать периодическую отправку сообщений.
Для этого идём в функцию , в которой объявим целочисленную переменную и организуем бесконечный цикл, в котором узнаем состояние нашего клиента и пока добавим задержку в 2 секунды
1 2 3 4 5 6 7 8 9 10 11 12 |
esp_mqtt_client_start(client); uint16_t i = 0; while(1) { EventBits_t bits = xEventGroupWaitBits(mqtt_state_event_group, MQTT_EVT_CONNECTED, pdFALSE, pdTRUE, 0); vTaskDelay(2000 / portTICK_PERIOD_MS); } |
Перед задержкой, в случае, если состояние у нас CONNECTED, отправим инкрементируемое число, преобразованное в строку сразу в три топика также с разным уровнем качества обслуживания. Если дойдём до 10, то попробуем отписаться от одного из топиков
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
pdTRUE, 0); if (bits & MQTT_EVT_CONNECTED) { int msg_id; char str01[30]; sprintf(str01, "%d",i); ESP_LOGI(TAG, "i = %d", i); msg_id = esp_mqtt_client_publish(client, "house/room1/temp0", str01, 0, 0, 0); msg_id = esp_mqtt_client_publish(client, "house/room1/temp1", str01, 0, 1, 0); msg_id = esp_mqtt_client_publish(client, "house/room1/temp2", str01, 0, 2, 0); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); i++; if(i==10) { msg_id = esp_mqtt_client_unsubscribe(client, "/house/room2/cmd0"); ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); } } |
Соберём код, прошьём контроллер и посмотрим в MQTT Explorer, как будут приходить наши сообщения
В терминале мы видим, что мы отписались от топика
Также в MQTT Explorer мы можем посмотреть графическое отображение полученных чисел
В WireShark мы можем найти пакет с отпиской в общем TCP-пакете с публикацией
Итак, на данном уроке нам удалось создать вполне работоспособный клиент MQTT, который умеет в принципе всё, что должен уметь такой клиент.
Всем спасибо за внимание!
Предыдущий урок Программирование МК ESP32 Следующий урок
Недорогие отладочные платы ESP32 можно купить здесь Недорогие отладочные платы ESP32
Недорогие отладочные платы ESP32/ESP32-C3/ESP32-S3 можно купить здесь Недорогие отладочные платы ESP32
Логический анализатор 16 каналов можно приобрести здесь
Смотреть ВИДЕОУРОК в RuTube (нажмите на картинку)
Смотреть ВИДЕОУРОК в YouTube (нажмите на картинку)
Смотреть ВИДЕОУРОК в Дзен (нажмите на картинку)
Добавить комментарий