Продолжаем работу с протоколами модели OSI и на данном уроке мы попытаемся создать клиент MQTT.
С протоколом MQTT (Message Queuing Telemetry Transport) мы знакомы из уроков по передаче данных этому и этому.
Мы знаем, что клиент может быть одновременно и издателем и подписчиком тем, поэтому мы и наш клиент научим и тому и другому.
Также у нас есть брокер MQTT на плате Raspberry PI, который мы создали здесь.
А здесь мы отдельно поработали с уровнями качества обслуживания.
Подключим к питанию плату Raspberry PI, тем самым запустив наш брокер Mosquitto
Также к порту USB компьютера подключим плату ESP32
Проект мы сделаем из проекта урока 32 с именем WIFI_STA_HTTP_SERVER_AJAX и дадим ему новое имя WIFI_STA_TCP_MQTT. TCP в имени проекта будет говорить о том, что MQTT мы заводим поверх TCP, а не через WebSocket.
Откроем наш проект в Espressif IDE и для начала переименуем файлы http.c и http.h в mqtt.c и mqtt.h с обновлением ссылок. Как это делать, мы знаем.
В файле CMakeLists.txt также обновим имя подключаемого модуля
set(COMPONENT_SRCS "main.c wifi.c mqtt.c")
Перейдём в файл mqtt.c и удалим из него полностью весь код и добавим только вот это
1 2 3 4 |
#include "mqtt.h" //------------------------------------------------------------- static const char *TAG = "mqtt"; //------------------------------------------------------------- |
Добавим функцию запуска клиента MQTT, пока с пустым телом
1 2 3 4 5 6 |
static const char *TAG = "mqtt"; //------------------------------------------------------------- void mqtt_start(void) { } //------------------------------------------------------------- |
Перейдём в заголовочный файл mqtt.h и удалим также там весь код, а добавим вот этот с учётом прототипа только что добавленной нами функции
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#ifndef MAIN_MQTT_H_ #define MAIN_MQTT_H_ //------------------------------------------------------------- #include <string.h> #include "freertos/FreeRTOS.h" #include "freertos/task.h" #include "freertos/event_groups.h" #include "esp_log.h" #include "mqtt_client.h" #include "lwip/sockets.h" #include "lwip/dns.h" #include "lwip/netdb.h" //------------------------------------------------------------- void mqtt_start(void); //------------------------------------------------------------- #endif /* MAIN_MQTT_H_ */ |
Перейдём в файл main.c и в функции app_main удалим код, отвечающий за ножки светодиода RGB
gpio_reset_pin(CONFIG_RED_GPIO);
gpio_set_direction(CONFIG_RED_GPIO, GPIO_MODE_OUTPUT);
gpio_set_level(CONFIG_RED_GPIO, 1);
gpio_reset_pin(CONFIG_GREEN_GPIO);
gpio_set_direction(CONFIG_GREEN_GPIO, GPIO_MODE_OUTPUT);
gpio_set_level(CONFIG_GREEN_GPIO, 1);
gpio_reset_pin(CONFIG_BLUE_GPIO);
gpio_set_direction(CONFIG_BLUE_GPIO, GPIO_MODE_OUTPUT);
gpio_set_level(CONFIG_BLUE_GPIO, 1);
SPIFFS мы сегодня задействовать не будем, поэтому удалим код его инициализации
ESP_LOGI(TAG, "Initializing SPIFFS");
esp_vfs_spiffs_conf_t conf = {
.base_path = "/spiffs",
.partition_label = NULL,
.max_files = 5,
.format_if_mount_failed = true
};
ret = esp_vfs_spiffs_register(&conf);
if (ret != ESP_OK) {
if (ret == ESP_FAIL) {
ESP_LOGE(TAG, "Failed to mount or format filesystem");
} else if (ret == ESP_ERR_NOT_FOUND) {
ESP_LOGE(TAG, "Failed to find SPIFFS partition");
} else {
ESP_LOGE(TAG, "Failed to initialize SPIFFS (%s)", esp_err_to_name(ret));
}
return;
}
size_t total = 0, used = 0;
ret = esp_spiffs_info(conf.partition_label, &total, &used);
if (ret != ESP_OK) {
ESP_LOGE(TAG, "Failed to get SPIFFS partition information (%s)", esp_err_to_name(ret));
} else {
ESP_LOGI(TAG, "Partition size: total: %d, used: %d", total, used);
}
Вызовем функцию запуска клиента MQTT
1 2 3 |
ESP_LOGI(TAG, "wifi_init_sta: %d", ret); mqtt_start(); |
Так как SPIFFS мы задействовать не будем, то из файла partitions.csv удалим строку с конфигурацией данного раздела
storage, data, spiffs, , 0xF0000,
В строке выше немного прибавим памяти на основной раздел, чтобы компилятор не ругался
factory, app, factory, 0x10000, 0x120000,
В файле wifi.c из функции on_wifi_disconnect удалим код остановки web-сервера
httpd_handle_t* server = (httpd_handle_t*) arg;
if (*server) {
ESP_LOGI(TAG, "Stopping webserver");
stop_webserver(*server);
*server = NULL;
}
Также из функции on_got_ip удалим код запуска этого сервера
httpd_handle_t* server = (httpd_handle_t*) arg;
if (*server == NULL) {
ESP_LOGI(TAG, "Starting webserver");
*server = start_webserver();
}
Из функции wifi_start удалим инициализацию указателя на web-сервер
static httpd_handle_t server = NULL;
Также в данной функции изменим последний параметр вот в этом вызове на NULL
ret = esp_event_handler_register(WIFI_EVENT, WIFI_EVENT_STA_DISCONNECTED, &on_wifi_disconnect, NULL);
И здесь тоже
ret = esp_event_handler_register(IP_EVENT, IP_EVENT_STA_GOT_IP, &on_got_ip, NULL);
В файле wifi.h подключение файла также нам будет не нужно
#include "mqtt.h"
Теперь немного поработаем с конфигуратором.
Переименуем для красоты меню
menu "TCP MQTT Configuration"
Так как светодиод RGB мы не используем, то удалим код настройки его ножек
config RED_GPIO
int "LED RED GPIO number"
range 0 48
default 21
help
RED GPIO.
config GREEN_GPIO
int "LED GREEN GPIO number"
range 0 48
default 22
help
GREEN GPIO.
config BLUE_GPIO
int "LED BLUE GPIO number"
range 0 48
default 23
help
BLUE GPIO.
Код настройки сервера также удалим
config SERVER_IP
string "SERVER IPV4 Address"
default "192.168.0.13"
help
SERVER IPV4 Address.
config SERVER_PORT
int "Server Port"
range 0 65535
default 3333
help
The remote port.
config CLIENT_PORT
int "Client Port"
range 0 65535
default 4444
help
The local port.
Вместо этого добавим пункт настройки адреса брокера MQTT
1 2 3 4 5 |
config BROKER_URL string "BROKER URL" default "mqtt://192.168.1.138" help BROKER URL. |
Запустим конфигуратор и заполним в разделе TCP MQTT Configuration адрес нашего брокера
В функции mqtt_start в файле mqtt.c объявим и инициализируем переменную типа структуры конфигурации клиента MQTT
1 2 3 4 5 6 7 |
void mqtt_start(void) { esp_mqtt_client_config_t mqtt_cfg = { .uri = CONFIG_BROKER_URL, .password = "123456", .username = "mosquitto", }; |
Произведём первичную инициализацию клиента, получив адрес на его структуру
1 2 3 4 |
.username = "mosquitto", }; esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); |
Объявим глобальную переменную для группы событий
1 2 |
static const char *TAG = "mqtt"; static EventGroupHandle_t mqtt_state_event_group; |
Вернёмся в функцию mqtt_start и создадим группу событий
1 2 3 |
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); mqtt_state_event_group = xEventGroupCreate(); |
Добавим функцию обратного вызова для обработки событий клиента MQTT, в котором в случае режима отладки выведем некоторую информацию из входных параметров
1 2 3 4 5 6 7 |
static EventGroupHandle_t mqtt_state_event_group; //------------------------------------------------------------- static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); } //------------------------------------------------------------- |
Заберём некоторые входные параметры в локальные переменные и объявим целочисленную переменную для работы с идентификатором сообщений
1 2 3 4 |
ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); esp_mqtt_event_handle_t event = event_data; esp_mqtt_client_handle_t client = event->client; int msg_id; |
Заготовим обработку некоторых вариантов событий клиента, какие именно, понятно из имён макросов
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
int msg_id; switch ((esp_mqtt_event_id_t)event_id) { case MQTT_EVENT_CONNECTED: break; case MQTT_EVENT_DISCONNECTED: break; case MQTT_EVENT_SUBSCRIBED: break; case MQTT_EVENT_UNSUBSCRIBED: break; case MQTT_EVENT_DATA: break; case MQTT_EVENT_ERROR: break; default: ESP_LOGI(TAG, "Other event id:%d", event->event_id); break; } |
В функции mqtt_start зарегистрируем наш обработчик и запустим клиент
1 2 3 4 |
mqtt_state_event_group = xEventGroupCreate(); esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL); esp_mqtt_client_start(client); |
Объявим несколько макросов для битов группы событий
1 2 3 4 5 6 7 |
static EventGroupHandle_t mqtt_state_event_group; //------------------------------------------------------------- #define MQTT_EVT_CONNECTED BIT0 #define MQTT_EVT_SUBSCRIBED BIT1 #define MQTT_EVT_DISCONNECTED BIT2 #define MQTT_EVT_UNSUBSCRIBED BIT3 //------------------------------------------------------------- |
Добавим функцию для вывода ошибки в терминал
1 2 3 4 5 6 7 8 9 |
#define MQTT_EVT_UNSUBSCRIBED BIT3 //------------------------------------------------------------- static void log_error_if_nonzero(const char *message, int error_code) { if (error_code != 0) { ESP_LOGE(TAG, "Last error %s: 0x%x", message, error_code); } } //------------------------------------------------------------- |
В соответствующей ветке обработчика mqtt_event_handler определим ошибку и выведем информацию о ней в терминал
1 2 3 4 5 6 7 8 |
case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err); log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err); log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno); ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno)); } |
Если у нас произойдём событие соединения с MQTT-сервером, то установим соответствующий бит в группе событий, а бит состояния разъединения сбросим
1 2 3 4 |
case MQTT_EVENT_CONNECTED: xEventGroupSetBits(mqtt_state_event_group, MQTT_EVT_CONNECTED); xEventGroupClearBits(mqtt_state_event_group, MQTT_EVT_DISCONNECTED); ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); |
Соберём код, прошьём контроллер и убедимся в терминале, что мы соединились с брокером
Также у нас произошло событие 7. Если посчитать в перечисляемом типе, то это вот что
MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */
Попробуем опубликовать сообщение
1 2 3 |
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); msg_id = esp_mqtt_client_publish(client, "house/s1", "data_1", 0, 1, 0); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); |
Первый, второй и третий параметр, думаю понятен.
Четвёртый параметр — длина сообщения. Если ноль, то определяется автоматически по передаваемой строке.
Пятый — это уровень качества обслуживания, попробуем выставить QoS1.
Шестой — флаг retain. мы его не используем, поэтому ноль.
Соберём код, прошьём контроллер.
Запустим MQTT Explorer, которым мы пользовались в уроке 2 по передаче данных.
Настройки соединения не трогаем.
Убедимся, что мы подписаны на данный топик
Уровень качества обслуживания оставляем максимальный. Соединимся с брокером с помощью кнопки Connect.
Перезагрузим контроллер и если также всё нормально, то мы получим наше сообщение в MQTT Explorer
Также мы видим, что сообщение пришло с уровнем качества обслуживания 1.
А в терминале мы видим, что сообщение от нас ушло и видим его идентификатор
Также мы видим, что у нас произошло событие с идентификатором 5. Посчитав в перечисляемом типе, мы определим, что это вот какое событие
MQTT_EVENT_PUBLISHED, /*!< published event, additional context: msg_id */
Это событие успешной публикации сообщения.
Именно об успешной, то есть брокер подтвердил доставку. Если мы установим уровень качества обслуживания 0 в публикации, то такого события мы не получим.
Поэтому в своих проектах для проверки доставки опубликованных сообщений мы можем использовать обработку данного события.
Теперь давайте подпишемся на какие-нибудь топики с разными уровнями качества обслуживания
1 2 3 4 5 6 7 |
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); msg_id = esp_mqtt_client_subscribe(client, "house2/room2/cmd0", 0); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); msg_id = esp_mqtt_client_subscribe(client, "house2/room2/cmd1", 1); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); msg_id = esp_mqtt_client_subscribe(client, "house2/room2/cmd2", 2); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); |
Мы используем имя группы house2, а не house, для того, чтобы не было перекрёстных посылок. Мы с таким встречались также в уроке по практике уровней качества обслуживания.
Обработаем событие подписки
1 2 3 4 |
case MQTT_EVENT_SUBSCRIBED: xEventGroupSetBits(mqtt_state_event_group, MQTT_EVT_SUBSCRIBED); xEventGroupClearBits(mqtt_state_event_group, MQTT_EVT_UNSUBSCRIBED); ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); |
Также заодно обработаем событие отписки
1 2 3 4 |
case MQTT_EVENT_UNSUBSCRIBED: xEventGroupSetBits(mqtt_state_event_group, MQTT_EVT_UNSUBSCRIBED); xEventGroupClearBits(mqtt_state_event_group, MQTT_EVT_SUBSCRIBED); ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); |
Обработаем событие потери соединения
1 2 3 4 |
case MQTT_EVENT_DISCONNECTED: xEventGroupSetBits(mqtt_state_event_group, MQTT_EVT_DISCONNECTED); xEventGroupClearBits(mqtt_state_event_group, MQTT_EVT_CONNECTED); ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); |
И обработаем ещё событие получения сообщения, где выведем в терминал топик и само сообщение
1 2 3 4 |
case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); printf("DATA=%.*s\r\n", event->data_len, event->data); |
Запустим WireShark, отфильтруемся по адресу нашей платы, соберём код, прошьём контроллер и увидим в терминале, что подписка на топики прошла успешно
Я думаю, что вы уже поняли, что WireShark мы запустим не на компьютере, а на Raspberry PI, так как клиенты между собой не взаимодействуют по сети, только через брокера.
Мы увидим наши пакеты с соединением и подпиской
Также посмотрим состав пакета с нашими подписками
Все подписки здесь с соответствующими уровнями качества обслуживания QoS.
Попробуем опубликовать что-нибудь в данные топики в MQTT Explorer. Упражняться с уровнями не будем и передадим все сообщения с уровнями, как и в подписках
Посмотрим, как наши сообщения пришли в нашу плату
Все сообщения получены.
Также посмотрим в WireShark, как отправлял брокер нам наши сообщения
Не заглядывая внутрь пакетов, можно и так понять, с каким уровнем сообщения отправились нашему клиенту, глядя на отсутствие и наличие подтверждений публикаций.
Давайте теперь попробуем автоматизировать периодическую отправку сообщений.
Для этого идём в функцию , в которой объявим целочисленную переменную и организуем бесконечный цикл, в котором узнаем состояние нашего клиента и пока добавим задержку в 2 секунды
1 2 3 4 5 6 7 8 9 10 11 12 |
esp_mqtt_client_start(client); uint16_t i = 0; while(1) { EventBits_t bits = xEventGroupWaitBits(mqtt_state_event_group, MQTT_EVT_CONNECTED, pdFALSE, pdTRUE, 0); vTaskDelay(2000 / portTICK_PERIOD_MS); } |
Перед задержкой, в случае, если состояние у нас CONNECTED, отправим инкрементируемое число, преобразованное в строку сразу в три топика также с разным уровнем качества обслуживания. Если дойдём до 10, то попробуем отписаться от одного из топиков
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
pdTRUE, 0); if (bits & MQTT_EVT_CONNECTED) { int msg_id; char str01[30]; sprintf(str01, "%d",i); ESP_LOGI(TAG, "i = %d", i); msg_id = esp_mqtt_client_publish(client, "house/room1/temp0", str01, 0, 0, 0); msg_id = esp_mqtt_client_publish(client, "house/room1/temp1", str01, 0, 1, 0); msg_id = esp_mqtt_client_publish(client, "house/room1/temp2", str01, 0, 2, 0); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); i++; if(i==10) { msg_id = esp_mqtt_client_unsubscribe(client, "/house/room2/cmd0"); ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); } } |
Соберём код, прошьём контроллер и посмотрим в MQTT Explorer, как будут приходить наши сообщения
В терминале мы видим, что мы отписались от топика
Также в MQTT Explorer мы можем посмотреть графическое отображение полученных чисел
В WireShark мы можем найти пакет с отпиской в общем TCP-пакете с публикацией
Итак, на данном уроке нам удалось создать вполне работоспособный клиент MQTT, который умеет в принципе всё, что должен уметь такой клиент.
Всем спасибо за внимание!
Предыдущий урок Программирование МК ESP32 Следующий урок
Недорогие отладочные платы ESP32 можно купить здесь:
На AliExpress Недорогие отладочные платы ESP32
На Яндекс.Маркет Недорогие отладочные платы ESP32
Логический анализатор 16 каналов можно приобрести (AliExpress) здесь
Смотреть ВИДЕОУРОК в YouTube (нажмите на картинку)
Смотреть ВИДЕОУРОК в Дзен (нажмите на картинку)
Добавить комментарий