# MQTT - publish/subscribe для IoT и mobile push _Сеть: L4 и выше · LinuxLab Knowledge Base_ **TL;DR:** MQTT - lightweight pub/sub поверх TCP. Topics с wildcards, QoS 0/1/2, retained messages, last will. Малый overhead (2 байта min header). Брокер обязателен (mosquitto/EMQX/HiveMQ). Применение: IoT, telemetry, mobile push. ## Зачем MQTT IoT-устройства (сенсоры, ESP32, пром-PLC) - constrained: - Мало RAM/Flash, нельзя HTTP-stack с парсером JSON - Battery-powered - каждый wakeup экономия - Flaky network (cellular, mesh) - нужны guarantees доставки - Один device → много subscriber'ов (телеметрия в дашборд + DB + alert) HTTP не подходит: сервер не может push'ить в device без long-poll (требует open connection и keepalive). REST request/response не ложится на "событие на каждом сенсоре". **MQTT (MQ Telemetry Transport)** - lightweight publish/subscribe protocol поверх TCP (or TLS). 1999, OASIS standard с 2014. Версии: MQTT 3.1.1 (стандарт 2014), MQTT 5 (2019, сильно расширен). Use cases: - **IoT telemetry**: device → broker → consumers (DB, alerts, dashboard) - **Mobile push notifications**: AWS IoT, Facebook Messenger - **Industrial IoT**: SCADA, smart factory - **Smart home**: Home Assistant, Zigbee2MQTT - **Connected car**: telemetry в облако ## Pub/sub model Три участника: - **Publisher** - шлёт messages в topic - **Subscriber** - подписывается на topic (или pattern), получает messages - **Broker** - центральный узел, маршрутизирует publish → subscribers ``` ┌─────────┐ PUBLISH ┌────────┐ SUBSCRIBE ┌──────────┐ │ device1 │ ──"temp"──► │ │ ◄─────────── │ dashboard │ └─────────┘ │ │ └──────────┘ │ broker │ ┌─────────┐ │ │ ┌──────────┐ │ device2 │ ──"temp"──► │ │ ──"temp"──► │ database │ └─────────┘ └────────┘ └──────────┘ ``` Publisher и subscriber не знают друг о друге - только о брокере. ## Topics - hierarchical с wildcards Topic - строка с разделителями `/`: ``` home/livingroom/temperature home/livingroom/humidity home/kitchen/temperature factory/line1/machine42/error ``` Wildcards в subscription: - **`+`** - один уровень (`home/+/temperature` matches livingroom и kitchen) - **`#`** - все ниже (`home/#` matches всё под home/, на любую глубину) ``` Subscribe: home/+/temperature → получает home/livingroom/temperature, home/kitchen/temperature Subscribe: home/# → получает всё home/... ``` `#` только в конце, `+` в любой позиции. Topic строит сам publisher (нет схемы / discovery в core MQTT). ## QoS - три уровня доставки ### QoS 0 - "fire and forget" ``` publisher ──PUBLISH──► broker ──PUBLISH──► subscriber ``` Один пакет, без ack. Если потерян - потерян. Самый быстрый, default. Для сенсоров где старое значение не важно (новое прилетит через секунду). ### QoS 1 - "at least once" ``` publisher ──PUBLISH (id=42)──► broker publisher ◄──────PUBACK 42────── broker ``` Publisher хранит messages пока не пришёл PUBACK. Если timeout - retry с DUP-flag. Получатель **может** получить дубль (если PUBACK потерялся, publisher retry'ит). App должен быть idempotent. ### QoS 2 - "exactly once" Четырёхступенчатый handshake: ``` publisher ──PUBLISH (id=42)──► ◄──PUBREC 42───── ──PUBREL 42────► ◄──PUBCOMP 42─── broker ``` Дубликаты удаляются на уровне протокола. Самый дорогой, реально нужен в денежных транзакциях, изменении state. Для большинства IoT - QoS 1 достаточно. Subscriber запрашивает свой QoS при subscribe; effective = `min(publisher_qos, subscriber_qos)`. ## Retained messages - last known state Обычно subscriber получает только messages **после** subscribe. Если subscribe'нулся через час - не знает текущей температуры. **Retained flag**: publisher шлёт с `retain=true`, broker сохраняет как "last value" для этого topic. Новый subscriber **сразу** получает retained-message: ``` $ mosquitto_pub -t home/temperature -r -m "21.5" # сохранено $ mosquitto_sub -t home/temperature 21.5 ← пришло сразу при subscribe ``` Используется для **shadow state**: текущее состояние девайса. Очистить - publish empty payload с retain=true. ## Last Will and Testament (LWT) При CONNECT клиент может задать LWT-message: топик + payload, который броker автоматически опубликует когда клиент **abnormally отключится** (TCP timeout, не graceful DISCONNECT). Применение: - Device publish в `devices/dev42/status` payload "online" с retain - LWT: same topic, payload "offline" с retain - Если device отключился - все подписанные узнают, что он offline ## Wire format - бинарный, минималистичный Минимальный CONNECT (для MQTT 3.1.1): ``` 10 14 Fixed header: type=CONNECT, length=20 00 04 4D 51 54 54 protocol name: "MQTT" 04 protocol version: 4 (=3.1.1) 02 flags: clean session 00 3C keepalive: 60s 00 08 64 65 76 69 63 65 31 32 client id: "device12" ``` PUBLISH ещё короче - 2 байта header + topic + payload. На constrained device этого хватает. ## Connection - keepalive и persistent session ### Keepalive При CONNECT задаётся `keepalive` в секундах. Если клиент N секунд ничего не шлёт - должен послать **PINGREQ**, broker отвечает PINGRESP. Если broker не получает ничего за 1.5 × keepalive - закрывает. Для battery-powered: маленький keepalive (15-30 сек) → быстрая detection. Большой (300+ сек) → меньше radio wakeups, но slower detection. ### Persistent session CONNECT с **clean session = false** + client_id → broker помнит: - Подписки клиента - QoS 1/2 messages, не доставленные пока был offline Когда клиент reconnect'ится с тем же client_id - получит pending messages. Это **офлайн delivery**. Clean session=true (default в mqtt.js) - всё забыли при disconnect. ## MQTT 5 - что добавили - **Reason codes** - вместо просто "disconnect", broker говорит "почему" (`Server busy`, `Unauthorized`, `Topic name invalid`) - **User properties** - кастомные KV в headers (как HTTP) - **Shared subscriptions** - `$share/group/topic` - load balance между N instances - **Topic alias** - сжатие повторяющихся topic-имён - **Session expiry interval** - explicit вместо binary clean/persistent - **Authentication enhanced** - challenge-response, SASL - **Server redirect** - broker перенаправляет на другой Брокеры postепенно мигрируют. Mosquitto - MQTT 5 с 2.0. EMQX - native. ## Брокеры - сравнение | Брокер | Лицензия | Особенности | |--------|----------|-------------| | **Mosquitto** | EPL | Самый популярный, lightweight, single-node | | **EMQX** | Apache 2.0 | Кластерный, миллионы коннектов, MQTT 5 native | | **HiveMQ** | Commercial (CE free) | Enterprise, кластерный | | **VerneMQ** | Apache 2.0 | Эрланг-based, кластерный | | **AWS IoT Core** | managed | AWS-интегрированный, billing per-msg | | **Azure IoT Hub** | managed | Azure-интегрированный | Mosquitto - быстрый старт для testing, дома, маленьких deploy. EMQX/HiveMQ - production с миллионами устройств. ## TLS и аутентификация Plain MQTT - TCP/1883. Через TLS - **MQTT over TLS** на 8883. Mutual TLS ([[tls-certificates|mTLS]]) - стандарт в IoT: каждый device имеет свой client-cert, broker проверяет, выдаёт уникальный client_id. AWS IoT Core строится именно на этом. Альтернативы: - **Username/password** в CONNECT (обязательно поверх TLS!) - **JWT/token** в username field (broker валидирует) - **MQTT 5 enhanced auth** - challenge-response через SASL ## MQTT vs альтернативы | Свойство | MQTT | [[coap\|CoAP]] | AMQP 0.9.1 (RabbitMQ) | [[websocket\|WebSocket]] | |----------|------|----------------|-----------------------|--------------------------| | Транспорт | TCP/TLS | UDP/DTLS | TCP/TLS | TCP/TLS | | Min header | 2 байта | 4 байта | 8+ байт | 2-14 байт | | Pub/sub | да (через broker) | observe-pattern | да (queues+exchanges) | нет (raw) | | Reliable delivery | QoS 0/1/2 | confirmable=да/нет | persistent queue | нет | | Battery-friendly | средне | да (UDP, minimal) | нет | нет | | Schema/типизация | нет | нет | нет | нет | | Use case | IoT mobile telemetry | constrained device | enterprise message bus | bidir browser | ## MQTT через WebSocket Браузеры не могут открыть raw TCP/8883 (ограничения sandbox). Решение - **MQTT over WebSocket**: брокер слушает WS-port (8083, 8084 с TLS), upgrade'ит к MQTT внутри WS-frame'ов. Используется в browser-based dashboards (HiveMQ Insights, MQTT.js client). ## Когда что-то пошло не так - **Connection refused (TCP)** - broker не слушает. `nc -zv host 1883`. - **`bad_username_or_password`** - обычно AUTH provider в брокере не настроен или TLS-cert client'а не валиден. - **Не получаю messages после reconnect** - `clean session=true` забывает подписки. Используй `false` + persistent client_id. - **Дубли при QoS 1** - normal, app должен быть idempotent. Включи QoS 2 если действительно надо exactly-once. - **Retained-сообщение "залипло"** - publish empty payload с retain=true очистит. - **High message rate упирает broker в потолок** - mosquitto single-thread, пишет всё в один лог. Для больших нагрузок - EMQX/HiveMQ кластер. - **Wildcards `home/+/+/temperature` нагружают broker** - очень дорого matching, особенно с миллионами клиентов. Делай topics плоскими. ## Где почитать - https://mqtt.org/ - спецификации - https://www.hivemq.com/blog/mqtt-essentials-part-1-introducing-mqtt/ - серия HiveMQ "MQTT Essentials" (best intro) - https://mosquitto.org/man/ - mosquitto клиент-tools ## Команды ```bash mosquitto_sub -h localhost -t 'home/#' -v ``` Subscribe на все топики под home/, выводить topic + payload ```bash mosquitto_pub -h localhost -t home/temperature -m 21.5 -q 1 -r ``` Publish с QoS 1 + retain (broker запомнит как last-known) ```bash mosquitto_pub -h broker -p 8883 --cafile ca.pem --cert client.crt --key client.key -t devices/d1/temp -m 21.5 ``` MQTT через TLS с mutual auth (mTLS) ```bash mosquitto_sub -t '$SYS/#' -h localhost ``` Read mosquitto-internal stats: connections, msg rate, uptime ($SYS-tree) ```bash mosquitto -c /etc/mosquitto/mosquitto.conf -v ``` Запустить broker в foreground с verbose-логом ```bash mosquitto_pub -t devices/d1/status --will-topic devices/d1/status --will-payload offline --will-retain ``` Publish + установить LWT - на abnormal disconnect broker запостит 'offline' ```bash tshark -Y mqtt -i eth0 ``` Wireshark dissector для MQTT - видеть фреймы и QoS ```bash emqx ctl listeners ``` В EMQX - показать active listeners (TCP, TLS, WS, WSS) ## См. также - [TCP three-way handshake](/kb/tcp-handshake.md) - [TLS handshake](/kb/tls-handshake.md) - [TLS-сертификаты - X.509, цепочка доверия, Let's Encrypt](/kb/tls-certificates.md) - [CoAP - REST для constrained-устройств поверх UDP](/kb/coap.md) - [WebSocket - bidirectional поверх HTTP](/kb/websocket.md)