MQTT与EMQ

news/2025/2/23 22:09:19

文章目录

  • 1 MQTT协议与EMQ中间件
    • 1.1 物联网消息协议MQTT
      • 1.1.1 什么是MQTT
      • 1.1.2 MQTT相关概念
      • 1.1.3 消息服务质量QoS——信息的可靠投递
        • 1.1.3.1 QoS0——消息服务质量为0,消息发送至多一次
        • 1.1.3.2 QoS1——消息发送至少一次
        • 1.1.3.3 QoS2——消息发送仅一次
        • 1.1.3.4 不同情况下客户端收到的消息QoS
      • 1.1.4 topic通配符匹配规则
        • 1.1.4.1 层级分隔符:/
        • 1.1.4.2 多层通配符:#
        • 1.1.4.3 单层通配符:+
      • 1.1.5 MQTT优点
    • 1.2 物联网消息中间件EMQX
      • 1.2.1 什么是EMQX
      • 1.2.2 EMQ环境安装——基于Docker
      • 1.2.3 EMQ客户端
        • 1.2.3.1 EMQDashboard
        • 1.2.3.2 EMQTTX
      • 1.2.4 延迟消息
      • 1.2.5 共享订阅
        • 1.2.5.1 不带群组的共享订阅
        • 1.2.5.2 带群组的共享订阅
    • 1.3 Eclipse Paho
      • 1.3.1 Eclipse Paho是什么
      • 1.3.2 Eclipse Paho快速入门
        • 1.3.2.1 集成Eclipse paho
        • 1.3.2.2 发布消息到EMQ
        • 1.3.2.3订阅消息

1 MQTT协议与EMQ中间件

1.1 物联网消息协议MQTT

1.1.1 什么是MQTT

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议):其时基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议。
在这里插入图片描述

客户端服务端(消息代理Broker)
发布其他客户端可能会订阅的消息接收来自客户端的网络连接
订阅其他客户端发布的消息接收客户点发布的应用消息
退订或删除应用程序的消息处理来自客户端的订阅和退订请求
断开与服务器的连接向订阅的客户转发应用和程序消息

1.1.2 MQTT相关概念

MQTT常用方法解释
CONNECT客户端连接到服务器
CONNACK连接确认
PUBLISH发布消息
PUBACK发布确认
PUBREC发布的消息已接收
PUBREL发布的消息已释放
PUBCOMP发布完成
SUBSCRIBE订阅请求
SUBACK订阅确认
UNSUBSCRIBE取消订阅
UNSUBACK取消订阅确认
PINGREQ客户端发送心跳
PINGRESP服务端心跳响应
DISCONNECT断开连接
AUTH认证

1.1.3 消息服务质量QoS——信息的可靠投递

MQTT协议中规定了消息服务质量(QoS),其保证了在不同网络环境下信息传递的可靠性。MQTT设计了QoS0、QoS1、QoS2三个QoS等级。

QoS等级说明注意
QoS0消息最多传递一次消息发布完全依赖底层TCP/IP网络,会发生消息丢失,消息不会被接收端应答,也不会被发送者存储再发送,称之为“即发即弃
QoS1消息至少传递一次
QoS2消息传递一次

1.1.3.1 QoS0——消息服务质量为0,消息发送至多一次

消息服务质量为0(QoS0)消息发送至多一次,消息发布完全依赖底层TCP/IP网络,会发生消息丢失,消息不会被接收端应答,也不会被发送者存储再发送,称之为“即发即弃

在这里插入图片描述

1.1.3.2 QoS1——消息发送至少一次

消息服务质量为1(QoS1)消息发送至少一次确保消息送达,但可能发生消息重复投递,发送者会存储消息直到接收者反馈回Puback(发布确认)格式的应答确认

在这里插入图片描述

1.1.3.3 QoS2——消息发送仅一次

消息服务质量为2(QoS2)消息发送仅一次确保消息到达一次,他将相应的处理Publish(发布)消息,并通过Pubrec(发布收到)向发送方确认。

在这里插入图片描述

1.1.3.4 不同情况下客户端收到的消息QoS

发布消息的QoS主题订阅的QoS接收消息的QoS
000
010
020
121
211
222

总结由上表得出:若,发布的消息QoS为m,主题订阅的消息QoS为m,接收消息的QoS为h时:

  • ① 若m=0或n=0,则h=0;
  • ② 若0<m< n ,则h=n;
  • ③ 若0<n<m,则h=n。

1.1.4 topic通配符匹配规则

1.1.4.1 层级分隔符:/

层级分隔符 / :用来分割主题树的每一层,为主题(Topic)空间提供分等级结构,适用于:当两个通配符在一个主题中出现的时候。
例如:love/you/with/all/my

1.1.4.2 多层通配符:#

多层通配符 # :多层通配符表示≥0的层次。因此,love/#也可以匹配到单独的love,此时#代表love后边没有的0层。
注意:多层通配符必须是主题树最后一个字符,例如:love/#可以匹配到love,但是love/#/with是无效的

1.1.4.3 单层通配符:+

单层通配符 + :只匹配一层,例如:love/you/#匹配love/you/with,但是不能匹配love/you/with/all,他只额能通配一个字符。

1.1.5 MQTT优点

① 精简,不添加冗余功能;② 发布/订阅模式,方便消息在传感器间传递,客户端与服务端完成解耦;③ 动态创建主题(不需要预先创建主题),零运维成本。④支持连续的会话保持和控制(心跳检测);⑤ 提供服务质量(quality of service level:QoS)管理;⑥ 不强求传输数据格式与类型。

1.2 物联网消息中间件EMQX

1.2.1 什么是EMQX

EMQ X Broker 是基于高并发的Erlang/OTP语言平台开发,支持百万级连接与分布式集群架构,基于MQTT协议的消息服务器
参考EMQX官网
在这里插入图片描述

EMQX特点:

  • ① 基于MQTT协议实现得开源消息中间件,②支持桥接和共享订阅,③中国本地技术支持服务。

1.2.2 EMQ环境安装——基于Docker

 docker pull emqx/emqx:v4.1.0
 docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083  emqx/emqx:v4.1.0

1.2.3 EMQ客户端

1.2.3.1 EMQDashboard

MQTT X 是 EMQ 开源的一款优雅的跨平台 MQTT 5.0 桌面客户端

1.2.3.2 EMQTTX

EMQ X 提供 Dashboard客户端方便用户管理设备与监控相关指标。
默认用户名是 admin,密码是 public

1.2.4 延迟消息

步骤:

  • ①用户需开启模块的emqx_mod_delayed
    在这里插入图片描述
  • ②主题格式:$delayed/{DelayInterval}/{TopicName}

注意
① 使用 $delayed 作为主题前缀的消息都将被视为需要延迟发布的消息。
② {DelayInterval}: 指定该 MQTT 消息延迟发布的时间间隔,单位是秒,允许的最大间隔是 4294967 秒。
③ {TopicName}: MQTT 消息的主题名称。

1.2.5 共享订阅

注意:分组发送是每一个组里选一个,因此十分适合微服务集群。

1.2.5.1 不带群组的共享订阅

格式:$queue/{TopicName}
在这里插入图片描述

EMQ X的共享订阅支持均衡策略配置(默认Random): etc/emqx.con在这里插入图片描述

1.2.5.2 带群组的共享订阅

$share/<group-name>/{TopicName}

在这里插入图片描述

1.3 Eclipse Paho

1.3.1 Eclipse Paho是什么

Eclipse paho 实现mqtt协议java客户端。类似于Mysql于JDBC。

1.3.2 Eclipse Paho快速入门

1.3.2.1 集成Eclipse paho

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

1.3.2.2 发布消息到EMQ

java">@GetMapping("/publish")
public void publish() throws MqttException {

    MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化
    MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);

    //连接选项中定义用户名密码和其它配置
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
    options.setAutomaticReconnect(true);//是否自动重连
    options.setConnectionTimeout(30);//连接超时时间  秒
    options.setKeepAliveInterval(10);//连接保持检查周期  秒
    options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
    
    client.connect(options);//连接
    client.publish("topic", "发送内容".getBytes(), 2, false);

}

1.3.2.3订阅消息

java">@GetMapping("/subscribe")
public void subscribe() throws MqttException {

    MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化
    MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);

    //连接选项中定义用户名密码和其它配置
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
    options.setAutomaticReconnect(true);//是否自动重连
    options.setConnectionTimeout(30);//连接超时时间  秒
    options.setKeepAliveInterval(10);//连接保持检查周期  秒
    options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本

    client.setCallback(new MqttCallbackExtended() {
        @Override
        public void connectionLost(Throwable throwable) {
            System.out.println("连接丢失!");
        }

        @Override
        public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            System.out.println( "接收到消息  topic:" +s+"  id:"+mqttMessage.getId() +" message:"+ mqttMessage.toString());
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }

        @Override
        public void connectComplete(boolean b, String s) {
            System.out.println("连接成功!");
        }
    });
    client.connect(options);//连接
    client.subscribe("test");  //订阅主题

}

http://www.niftyadmin.cn/n/399185.html

相关文章

Docker镜像与Docker容器详解

文章目录 Docker帮助信息Docker镜像拉取镜像镜像仓库官方与非官方镜像仓库 拉取指定标签的镜像为镜像打多个标签搜索镜像镜像和分层共享镜像层根据摘要拉取镜像删除镜像总结 Docker容器启动一个简单的容器容器进程容器生命周期优雅的退出容器利用重启策略进行容器的自我修复Web…

MongoDB(学习笔记1.0)

最近在学非关系型数据库MongoDB&#xff0c;猛地用起来的真的没关系型数据库方便啊。 首先还是数据库的安装&#xff1a; 安装直接去官网安装即可&#xff0c;官网地址&#xff1a;MongoDB: The Developer Data Platform | MongoDB 当前也有免安装版的&#xff0c;这里就不再…

斩获阿里offer,这份258页面试宝典也太顶了....

测试三年有余&#xff0c;很多新学到的技术不能再项目中得到实践&#xff0c;同时薪资的涨幅很低&#xff0c;于是萌生了跳槽大厂的想法 但大厂不是那么容易进的&#xff0c;前面惨败字节&#xff0c;为此我辛苦准备了两个月&#xff0c;又从小公司开始面试了半个月有余&#…

详情API接口代购系统的运用PHP源码

随着电商平台的快速发展和普及&#xff0c;越来越多的人开始选择在网上购物。在这一过程中&#xff0c;电商详情API接口扮演着重要的作用&#xff0c;因为它们能够提供关于商品或服务的详细信息&#xff0c;使得用户能够更加全面地了解产品和服务&#xff0c;方便用户进行购买决…

onnx模型的修改与调试demo

主要参考&#xff1a; 模型部署入门教程&#xff08;五&#xff09;&#xff1a;ONNX 模型的修改与调试 第五章&#xff1a;ONNX 模型的修改与调试 使用netron 可视化模型 读写onnx 构造onnx 创建一个描述线性函数 output axb 的onnx模型。 需要两个节点&#xff0c;第一个…

docker-compose中links和depends_on关键字学习总结

links link关键字用于在不同的容器之间创建网络链接。 它允许一个容器能够访问另一个容器的网络连接信息&#xff08;如IP地址和端口&#xff09;。 使用方法&#xff1a;在Docker Compose配置文件中&#xff0c;通过links关键字定义容器之间的链接关系。例如&#xff1a; ve…

XSS跨站脚本安全漏洞防护

文章目录 1 跨站脚本1.1 存储型XSS1.2 反射型XSS 2 、案例2.1 通过正则表达式替换跨站脚本2.2 构建请求的代理类&#xff0c;在构造方法中对请求中的内容进行分析2.3 构建响应的代理类2.4 通过Filter过滤掉请求和响应中的跨站脚本 3 测试3.1 在接口的body参数中添加一个脚本3.2…

深浅拷贝及赋值区别理解

浅拷贝&#xff1a; 对象浅拷贝Object.assign() 数组的浅拷贝Array.prototype.slice()与Array.prototype.concat() 解构赋值 1.对于数组/对象中基本数据类型[123,234]&#xff0c;拷贝的是数值&#xff0c;所以修改拷贝后的这个值&#xff0c;原数据不会改变&#xff1b;2.对…