一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连
作者:mmseoamin日期:2023-12-13

一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布

简介: 之前介绍了RabbitMQ以及如何在SpringBoot项目中整合使用RabbitMQ,看过的朋友都说写的比较详细,希望再总结一下目前比较流行的MQTT。所以接下来,就来介绍什么MQTT?它在IoT中有着怎样的作用?如何在项目中使用MQTT?

之前介绍了RabbitMQ以及如何在SpringBoot项目中整合使用RabbitMQ,看过的朋友都说写的比较详细,希望再总结一下目前比较流行的MQTT。所以接下来,就来介绍什么MQTT?它在IoT中有着怎样的作用?如何在项目中使用MQTT?

一、MQTT介绍

1.1 什么是MQTT?

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。

MQTT最大优点在于用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,image.png,第1张

MQTT具有协议简洁、轻巧、可扩展性强、低开销、低带宽占用等优点,已经有PHP,JAVA,Python,C,C#,Go等多个语言版本,基本可以使用在任何平台上。在物联网、小型设备、移动应用等方面有较广泛的应用,特别适合用来当做物联网的通信协议。

1.2 MQTT特点

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。

MQTT协议是为硬件性能有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

  • 1.使用发布/订阅消息模式,提供多对多的消息发布,解除应用程序耦合;

  • 2.对负载内容屏蔽的消息传输;

  • 3.使用TCP/IP 提供网络连接;

  • 4.支持三种消息发布服务质量(QoS):

  • QoS 0(最多一次):消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这个级别可用于如下情况,环境传感器数据,丢失一次数据无所谓,因为不久后还会有第二次发送。

  • QoS 1(至少一次):确保消息到达,但消息重复可能会发生。

  • QoS 2(只有一次):确保消息到达一次。这个级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

  • 5.传输数据小,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;(用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。)

    1.3 MQTT应用场景

    MQTT作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有着广泛的应用。MQTT服务只负责消息的接收和传递,应用系统连接到MQTT服务器后,可以实现采集数据接收、解析、业务处理、存储入库、数据展示等功能。常见的应用场景主要有以下几个方面:

    (1)消息推送: 如PC端的推送公告,比如安卓的推送服务,还有一些即时通信软件如微信、易信等也是采用的推送技术。

    (2)智能点餐: 通过MQTT消息队列产品,消费者可在餐桌上扫码点餐,并与商家后端系统连接实现自助下单、支付。

    (3)信息更新: 实现商场超市等场所的电子标签、公共场所的多媒体屏幕的显示更新管理。

    (4)扫码出站: 最常见的停车场扫码缴费,自动起竿;地铁闸口扫码进出站。

    二、MQTT的角色组成

    2.1 MQTT的客户端和服务端

    2.1.1 服务端(Broker)

    EMQX就是一个MQTT的Broker,emqx只是基于erlang语言开发的软件而已,其它的MQ还有ActiveMQ、RabbitMQ、HiveMQ等等。

    EMQX服务端:https://www.emqx.io/zh/downloads?os=Windows

    2.1.2 客户端(发布/订阅)

    EMQX客户端:https://mqttx.app/zh

    这个是用来测试验证的客户端,实际项目是通过代码来实现我们消息的生产者和消费者。

    2.2 MQTT中的几个概念

    相比RabbitMQ等消息队列,MQTT要相对简单一些,只有Broker、Topic、发布者、订阅者等几部分构成。接下来我们先简单整理下MQTT日常使用中最常见的几个概念:

    • 1.Topic主题:MQTT消息的主要传播途径, 我们向主题发布消息, 订阅主题, 从主题中读取消息并进行.业务逻辑处理, 主题是消息的通道
    • 2.生产者:MQTT消息的发送者, 他们向主题发送消息
    • 3.消费者:MQTT消息的接收者, 他们订阅自己需要的主题, 并从中获取消息
    • 4.broker服务:消息转发器, 消息是通过它来承载的, EMQX就是我们的broker, 在使用中我们不用关心它的具体实现

      其实, MQTT的使用流程就是: 生产者给broker的某个topic发消息->broker通过topic进行消息的传递->订阅该主题的消费者拿到消息并进行相应的业务逻辑

      三、EMQX的安装和使用

      下面以Windows为例,演示Windows下如何安装和使用EXQX。

      step 1:下载EMQ安装包,配置EMQ环境

      EMQX服务端:https://www.emqx.io/zh/downloads?os=Windows

      step 2:下载压缩包解压,cmd进入bin文件夹

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,image.png,第2张

      step 3:启动EMQX服务

      在命令行输入:emqx start 启动服务,打卡浏览器输入:http://localhost:18083/ 进入登录页面。默认用户名密码 admin/public 。登录成功后,会进入emqx的后台管理页面,如下图所示:

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,image.png,第3张

      四、使用SpringBoot整合MQTT协议

      前面介绍了MQTT协议以及如何安装和启动MQTT服务。接下来演示如何在SpringBoot项目中整合MQTT实现消息的订阅和发布。

      4.1 创建工程

      首先,创建spring-boot-starter-mqtt父工程,在父工程下分别创建消息的提供者spring-boot-starter-mqtt-provider 模块和消息的消费者spring-boot-starter-mqtt-consumer模块。

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,image.png,第4张

      4.2 实现生产者

      接下来,修改生产者模块spring-boot-starter-mqtt-provider 相关的代码,实现消息发布的功能模块。

      4.2.1 导入依赖包

      修改pom.xml 文件,添加MQTT相关依赖,具体示例代码如下所示:

      
              
                  org.springframework.boot
                  spring-boot-starter
              
              
                  org.springframework.boot
                  spring-boot-starter-web
              
              
                  org.springframework.boot
                  spring-boot-starter-test
                  test
              
              
                  org.springframework.integration
                  spring-integration-mqtt
                  5.3.2.RELEASE
              
              
                  org.projectlombok
                  lombok
                  1.18.4
              
          
      
      4.2.2 修改配置文件

      修改application.yml配置文件,增加MQTT相关配置。示例代码如下所示:

      spring:
        application:
          name: provider
          #MQTT配置信息
        mqtt:
          #MQTT服务地址,端口号默认11883,如果有多个,用逗号隔开
          url: tcp://127.0.0.1:11883
          #用户名
          username: admin
          #密码
          password: public
          #客户端id(不能重复)
          client:
            id: provider-id
          #MQTT默认的消息推送主题,实际可在调用接口是指定
          default:
            topic: topic
      server:
        port: 8080
      
      4.2.3 消息生产者客户端配置

      创建MqttProviderConfig配置类,读取application.yml中的相关配置,并初始化创建MQTT的连接。示例代码如下所示:

      import lombok.extern.slf4j.Slf4j;
      import org.eclipse.paho.client.mqttv3.*;
      import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Configuration;
      import javax.annotation.PostConstruct;
      @Configuration
      @Slf4j
      public class MqttProviderConfig {
          @Value("${spring.mqtt.username}")
          private String username;
          @Value("${spring.mqtt.password}")
          private String password;
          @Value("${spring.mqtt.url}")
          private String hostUrl;
          @Value("${spring.mqtt.client.id}")
          private String clientId;
          @Value("${spring.mqtt.default.topic}")
          private String defaultTopic;
          /**
           * 客户端对象
           */
          private MqttClient client;
          /**
           * 在bean初始化后连接到服务器
           */
          @PostConstruct
          public void init(){
              connect();
          }
          /**
           * 客户端连接服务端
           */
          public void connect(){
              try{
                  //创建MQTT客户端对象
                  client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
                  //连接设置
                  MqttConnectOptions options = new MqttConnectOptions();
                  //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
                  //设置为true表示每次连接服务器都是以新的身份
                  options.setCleanSession(true);
                  //设置连接用户名
                  options.setUserName(username);
                  //设置连接密码
                  options.setPassword(password.toCharArray());
                  //设置超时时间,单位为秒
                  options.setConnectionTimeout(100);
                  //设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
                  options.setKeepAliveInterval(20);
                  //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
                  options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
                  //设置回调
                  client.setCallback(new MqttProviderCallBack());
                  client.connect(options);
              } catch(MqttException e){
                  e.printStackTrace();
              }
          }
          public void publish(int qos,boolean retained,String topic,String message){
              MqttMessage mqttMessage = new MqttMessage();
              mqttMessage.setQos(qos);
              mqttMessage.setRetained(retained);
              mqttMessage.setPayload(message.getBytes());
              //主题的目的地,用于发布/订阅信息
              MqttTopic mqttTopic = client.getTopic(topic);
              //提供一种机制来跟踪消息的传递进度
              //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
              MqttDeliveryToken token;
              try {
                  //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
                  //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
                  token = mqttTopic.publish(mqttMessage);
                  token.waitForCompletion();
              } catch (MqttException e) {
                  e.printStackTrace();
              }
          }
      }
      
      4.2.4 生产者客户端消息回调

      创建MqttProviderCallBack类并继承MqttCallback,实现相关消息回调事件,示例代码如下图所示:

      import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
      import org.eclipse.paho.client.mqttv3.MqttCallback;
      import org.eclipse.paho.client.mqttv3.MqttMessage;
      public class MqttConsumerCallBack implements MqttCallback{
          /**
           * 客户端断开连接的回调
           */
          @Override
          public void connectionLost(Throwable throwable) {
              System.out.println("与服务器断开连接,可重连");
          }
          /**
           * 消息到达的回调
           */
          @Override
          public void messageArrived(String topic, MqttMessage message) throws Exception {
              System.out.println(String.format("接收消息主题 : %s",topic));
              System.out.println(String.format("接收消息Qos : %d",message.getQos()));
              System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));
              System.out.println(String.format("接收消息retained : %b",message.isRetained()));
          }
          /**
           * 消息发布成功的回调
           */
          @Override
          public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
              System.out.println(String.format("接收消息成功"));
          }
      }
      
      4.2.5 创建Controller控制器实现消息发布功能

      创建SendController控制器类,实现消息的发送功能,示例代码如下所示:

      @Controller
      public class SendController {
          @Autowired
          private MqttProviderConfig providerClient;
          @RequestMapping("/sendMessage")
          @ResponseBody
          public String sendMessage(int qos,boolean retained,String topic,String message){
              try {
                  providerClient.publish(qos, retained, topic, message);
                  return "发送成功";
              } catch (Exception e) {
                  e.printStackTrace();
                  return "发送失败";
              }
          }
      }
      

      4.3 实现消费者

      前面完成了生成者消息发布的模块,接下来修改消费者模块spring-boot-starter-mqtt-consumer实现消息订阅、处理的功能。

      4.3.1 导入依赖包

      修改pom.xml 文件,添加MQTT相关依赖,具体示例代码如下所示:

      
              
                  org.springframework.boot
                  spring-boot-starter
              
              
                  org.springframework.boot
                  spring-boot-starter-web
              
              
                  org.springframework.boot
                  spring-boot-starter-test
                  test
              
              
                  org.springframework.integration
                  spring-integration-mqtt
                  5.3.2.RELEASE
              
              
                  org.projectlombok
                  lombok
                  1.18.4
              
          
      
      4.3.2 修改配置文件

      修改application.yml配置文件,增加MQTT相关配置。示例代码如下所示:

      spring:
        application:
          name: consumer
        #MQTT配置信息
        mqtt:
          #MQTT服务端地址,端口默认为11883,如果有多个,用逗号隔开
          url: tcp://127.0.0.1:11883
          #用户名
          username: admin
          #密码
          password: public
          #客户端id(不能重复)
          client:
            id: consumer-id
          #MQTT默认的消息推送主题,实际可在调用接口时指定
          default:
            topic: topic
      server:
        port: 8085
      
      4.3.3 消费者客户端配置

      创建消费者客户端配置类MqttConsumerConfig,读取application.yml中的相关配置,并初始化创建MQTT的连接。示例代码如下所示:

      import javax.annotation.PostConstruct;
      import org.eclipse.paho.client.mqttv3.MqttClient;
      import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      import org.eclipse.paho.client.mqttv3.MqttException;
      import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Configuration;
      @Configuration
      public class MqttConsumerConfig {
          @Value("${spring.mqtt.username}")
          private String username;
          @Value("${spring.mqtt.password}")
          private String password;
          @Value("${spring.mqtt.url}")
          private String hostUrl;
          @Value("${spring.mqtt.client.id}")
          private String clientId;
          @Value("${spring.mqtt.default.topic}")
          private String defaultTopic;
          /**
           * 客户端对象
           */
          private MqttClient client;
          /**
           * 在bean初始化后连接到服务器
           */
          @PostConstruct
          public void init(){
              connect();
          }
          /**
           * 客户端连接服务端
           */
          public void connect(){
              try {
                  //创建MQTT客户端对象
                  client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
                  //连接设置
                  MqttConnectOptions options = new MqttConnectOptions();
                  //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
                  //设置为true表示每次连接到服务端都是以新的身份
                  options.setCleanSession(true);
                  //设置连接用户名
                  options.setUserName(username);
                  //设置连接密码
                  options.setPassword(password.toCharArray());
                  //设置超时时间,单位为秒
                  options.setConnectionTimeout(100);
                  //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
                  options.setKeepAliveInterval(20);
                  //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
                  options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
                  //设置回调
                  client.setCallback(new MqttConsumerCallBack());
                  client.connect(options);
                  //订阅主题
                  //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
                  int[] qos = {1,1};
                  //主题
                  String[] topics = {"topic1","topic2"};
                  //订阅主题
                  client.subscribe(topics,qos);
              } catch (MqttException e) {
                  e.printStackTrace();
              }
          }
          /**
           * 断开连接
           */
          public void disConnect(){
              try {
                  client.disconnect();
              } catch (MqttException e) {
                  e.printStackTrace();
              }
          }
          /**
           * 订阅主题
           */
          public void subscribe(String topic,int qos){
              try {
                  client.subscribe(topic,qos);
              } catch (MqttException e) {
                  e.printStackTrace();
              }
          }
      }
      
      4.3.4 消费者客户端消息回调

      创建MqttConsumerCallBack类并继承MqttCallback,实现相关消息回调事件,示例代码如下图所示:

      import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
      import org.eclipse.paho.client.mqttv3.MqttCallback;
      import org.eclipse.paho.client.mqttv3.MqttMessage;
      public class MqttConsumerCallBack implements MqttCallback{
          /**
           * 客户端断开连接的回调
           */
          @Override
          public void connectionLost(Throwable throwable) {
              System.out.println("与服务器断开连接,可重连");
          }
          /**
           * 消息到达的回调
           */
          @Override
          public void messageArrived(String topic, MqttMessage message) throws Exception {
              System.out.println(String.format("接收消息主题 : %s",topic));
              System.out.println(String.format("接收消息Qos : %d",message.getQos()));
              System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));
              System.out.println(String.format("接收消息retained : %b",message.isRetained()));
          }
          /**
           * 消息发布成功的回调
           */
          @Override
          public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
              System.out.println(String.format("接收消息成功"));
          }
      }
      
      4.3.5 创建Controller控制器,实现MQTT连接的建立和断开

      接下来,创建Controller控制器MqttController,并实现MQTT连接的建立和断开等方法。示例代码如下所示:

      import com.weiz.mqtt.config.MqttConsumerConfig;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.stereotype.Controller;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.ResponseBody;
      @Controller
      public class MqttController {
          @Autowired
          private MqttConsumerConfig client;
          @Value("${spring.mqtt.client.id}")
          private String clientId;
          @RequestMapping("/connect")
          @ResponseBody
          public String connect(){
              client.connect();
              return clientId + "连接到服务器";
          }
          @RequestMapping("/disConnect")
          @ResponseBody
          public String disConnect(){
              client.disConnect();
              return clientId + "与服务器断开连接";
          }
      }
      

      4.4 测试验证

      首先,分别启动生产者spring-boot-starter-mqtt-provider 和消费者spring-boot-starter-mqtt-consumer两个项目,打开浏览器,输入地址http://localhost:18083/,在EMQX管理界面可以看到连接上来的两个客户端。如下图所示:

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,image.png,第5张

      接下来,调用生产者的消息发布接口验证消息发布是否成功。使用Pomstman调用消息发送接口:http://localhost:8080/sendMessage ,如下图所示:

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,image.png,第6张

      通过上图可以发现,生产者模块已经把消息发送成功。接下来查看消费者模块,验证消息是否处理成功。如下图所示:

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,image.png,第7张

      通过日志输出可以发现,消费者已经成功接收到生产者发送的消息,说明我们成功实现在Spring Boot项目中整合MQTT实现了消息的发布和订阅的功能。

      最后

      以上就是如何在Spring Boot中使用MQTT的详细内容,更多关于在Spring Boot中MQTT的使用大家可以去自己研究学习。比如:如何利用qos机制保证数据不会丢失?消息的队列和排序?集群模式下的应用?等等。



      MQTT介绍与使用

      正文

      物联网是新一代信息技术的重要组成部分,也是“信息化”时代的重要发展阶段。其英文名称是:“Internet of things(IoT)”。顾名思义,物联网就是物物相连的互联网。这有两层意思:其一,物联网的核心和基础仍然是互联网,是在互联网基础上的延伸和扩展的网络;其二,其用户端延伸和扩展到了任何物品与物品之间,进行信息交换和通信,也就是物物相息。物联网通过智能感知、识别技术与普适计算等通信感知技术,广泛应用于网络的融合中,也因此被称为继计算机、互联网之后世界信息产业发展的第三次浪潮。

      而在物联网的应用上,对于信息传输,MQTT是一种再合适不过的协议工具了。

      一、MQTT简介

      MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级协议,该协议构建于TCP/IP协议之上,MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

      MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

      二、特性

      MQTT协议工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

      (1)使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。

      (2)对负载内容屏蔽的消息传输。

      (3)使用TCP/IP提供网络连接。

      主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。

      (4)有三种消息发布服务质量:

      “至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。

      “至少一次”,确保消息到达,但消息重复可能会发生。

      “只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。

      (5)小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。

      这就是为什么在介绍里说它非常适合“在物联网领域,传感器与服务器的通信,信息的收集”,要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。

      三、实现方式

      实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

      MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

      (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

      (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

      四、MQTT的搭建(ubuntu)

      1、apt-get安装mqtt相关包

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第8张

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第9张

      2、测试mosquitto是否正确运行

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第10张

      3、本机终端测试mqtt

      打开一个终端,订阅主题

      mosquitto_sub -h 192.168.1.102 -t "mqtt" -v
      

      【-h】指定要连接的MQTT服务器

        【-t】订阅主题,此处为mqtt

        【-v】打印更多的调试信息

      再打开一个终端,发布主题

      mosquitto_pub -h 192.168.1.102 -t "mqtt" -m "Hello Stonegeek"
      

      【-h】指定要连接的MQTT服务器

        【-t】向指定主题推送消息

        【-m】指定消息内容

      结果展示

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第11张

      五、MQTT权限配置

      前面我们基于Mosquitto服务器已经搭建成功了,但是默认是允许匿名用户登录,对于正式上线的项目则是需要进行用户认证(当然,用户一般都会与数据库映射,不过在这里我们就会直接将用户写入配置文件中)

      1、Mosquitto服务器的配置文件为/etc/mosquitto/mosquitto.conf,关于用户认证的方式和读取的配置都在这个文件中进行

      配置文件参数说明:

      IDallow_anonymouspassword_fileacl_fileresult
      1True(默认)允许匿名方式登录
      2Falsepassword_file开启用户验证机制
      3Falsepassword_fileacl_file开启用户验证机制,但访问控制不起作用
      4Truepassword_fileacl_file用户名及密码不为空,将自动进行用户验证且受到访问控制的限制;用户名及密码为空,将不进行用户验证且受到访问控制的限制
      5False无法启动服务

      allow_anonymous允许匿名

      password-file密码文件

      acl_file访问控制列表

      2、修改配置文件

      命令:sudo vi /etc/mosquitto/mosquitto.conf

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第12张

      3、添加用户信息

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第13张

      命令解释: -c 创建一个用户、/etc/mosquitto/pwfile.example 是将用户创建到 pwfile.example 文件中、admin 是用户名。

      同样连续会提示连续输入两次密码。注意第二次创建用户时不用加 -c 如果加 -c 会把第一次创建的用户覆盖。

      至此两个用户创建成功,此时如果查看 pwfile.example 文件会发现其中多了两个用户。

      4、添加Topic和用户的关系

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第14张

      5、用户认证测试

      (1)重启Mosquitto步骤

      查看mosquitto的进程

      命令:ps -aux|grep mosquitto

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第15张

      (2)杀死进程

      命令:sudo kill -9 pid

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第16张

      (3)启动

      命令:mosquitto -c /etc/mosquitto/mosquitto.conf

      (4)订阅端启动(不加用户)

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第17张

      订阅端启动(加用户)

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第18张

      (5)发布端启动

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第19张

      六、MQTT实现(Java语言)

      注意:由于我们在上面配置了MQTT的用户权限控制,所以下面的用户只能使用stonegeek登录,否则项目会运行报错,而且我们在上面设置的访问控制列表中只有mtopic主题,所以我们必须使用此主题,否则,订阅者会收不到已发布的主题内容(已经测试过了)

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第20张

      下面是我们Java语言实现的MQTT服务的发布/订阅

      1、添加Maven依赖

          
            org.eclipse.paho
            org.eclipse.paho.client.mqttv3
            1.1.1
          
      

      2、ServerMQTT.class

      package com.stonegeek;
      import org.eclipse.paho.client.mqttv3.MqttClient;
      import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
      import org.eclipse.paho.client.mqttv3.MqttException;
      import org.eclipse.paho.client.mqttv3.MqttMessage;
      import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
      import org.eclipse.paho.client.mqttv3.MqttTopic;
      import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
      public class ServerMQTT {
          //tcp://MQTT安装的服务器地址:MQTT定义的端口号
          public static final String HOST = "tcp://192.168.1.102:1883";
          //定义一个主题
          public static final String TOPIC = "mtopic";
          //定义MQTT的ID,可以在MQTT服务配置中指定
          private static final String clientid = "server11";
          private MqttClient client;
          private MqttTopic topic11;
          private String userName = "stonegeek";
          private String passWord = "123456";
          private MqttMessage message;
          /**
           * 构造函数
           * @throws MqttException
           */
          public ServerMQTT() throws MqttException {
              // MemoryPersistence设置clientid的保存形式,默认为以内存保存
              client = new MqttClient(HOST, clientid, new MemoryPersistence());
              connect();
          }
          /**
           *  用来连接服务器
           */
          private void connect() {
              MqttConnectOptions options = new MqttConnectOptions();
              options.setCleanSession(false);
              options.setUserName(userName);
              options.setPassword(passWord.toCharArray());
              // 设置超时时间
              options.setConnectionTimeout(10);
              // 设置会话心跳时间
              options.setKeepAliveInterval(20);
              try {
                  client.setCallback(new PushCallback());
                  client.connect(options);
                  topic11 = client.getTopic(TOPIC);
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
          /**
           *
           * @param topic
           * @param message
           * @throws MqttPersistenceException
           * @throws MqttException
           */
          public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
                  MqttException {
              MqttDeliveryToken token = topic.publish(message);
              token.waitForCompletion();
              System.out.println("message is published completely! "
                      + token.isComplete());
          }
          /**
           *  启动入口
           * @param args
           * @throws MqttException
           */
          public static void main(String[] args) throws MqttException {
              ServerMQTT server = new ServerMQTT();
              server.message = new MqttMessage();
              server.message.setQos(1);
              server.message.setRetained(true);
              server.message.setPayload("hello,topic11".getBytes());
              server.publish(server.topic11 , server.message);
              System.out.println(server.message.isRetained() + "------ratained状态");
          }
      }
      

      3、ClientMQTT.class

      package com.stonegeek;
      import java.util.concurrent.ScheduledExecutorService;
      import org.eclipse.paho.client.mqttv3.MqttClient;
      import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      import org.eclipse.paho.client.mqttv3.MqttException;
      import org.eclipse.paho.client.mqttv3.MqttTopic;
      import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
      public class ClientMQTT {
          public static final String HOST = "tcp://192.168.1.102:1883";
          public static final String TOPIC = "mtopic";
          private static final String clientid = "client11";
          private MqttClient client;
          private MqttConnectOptions options;
          private String userName = "stonegeek";
          private String passWord = "123456";
          private ScheduledExecutorService scheduler;
          private void start() {
              try {
                  // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
                  client = new MqttClient(HOST, clientid, new MemoryPersistence());
                  // MQTT的连接设置
                  options = new MqttConnectOptions();
                  // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
                  options.setCleanSession(true);
                  // 设置连接的用户名
                  options.setUserName(userName);
                  // 设置连接的密码
                  options.setPassword(passWord.toCharArray());
                  // 设置超时时间 单位为秒
                  options.setConnectionTimeout(10);
                  // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                  options.setKeepAliveInterval(20);
                  // 设置回调
                  client.setCallback(new PushCallback());
                  MqttTopic topic = client.getTopic(TOPIC);
                  //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
                  options.setWill(topic, "close".getBytes(), 2, true);
                  client.connect(options);
                  //订阅消息
                  int[] Qos  = {1};
                  String[] topic1 = {TOPIC};
                  client.subscribe(topic1, Qos);
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
          public static void main(String[] args) throws MqttException {
              ClientMQTT client = new ClientMQTT();
              client.start();
          }
      }
      

      4、PushCallback.class

      package com.stonegeek;
      import org.eclipse.paho.client.mqttv3.MqttCallback;
      import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
      import org.eclipse.paho.client.mqttv3.MqttMessage;
      /**
       * Created by StoneGeek on 2018/6/5.
       * 博客地址:http://www.cnblogs.com/sxkgeek
       * 发布消息的回调类 
       *
       * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 
       * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 
       * 在回调中,将它用来标识已经启动了该回调的哪个实例。 
       * 必须在回调类中实现三个方法: 
       *
       *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 
       *
       *  public void connectionLost(Throwable cause)在断开连接时调用。 
       *
       *  public void deliveryComplete(MqttDeliveryToken token)) 
       *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 
       *  由 MqttClient.connect 激活此回调。 
       */
      public class PushCallback implements MqttCallback{
          public void connectionLost(Throwable cause) {
              // 连接丢失后,一般在这里面进行重连
              System.out.println("连接断开,可以做重连");
          }
          public void deliveryComplete(IMqttDeliveryToken token) {
              System.out.println("deliveryComplete---------" + token.isComplete());
          }
          public void messageArrived(String topic, MqttMessage message) throws Exception {
              // subscribe后得到的消息会执行到这里面
              System.out.println("接收消息主题 : " + topic);
              System.out.println("接收消息Qos : " + message.getQos());
              System.out.println("接收消息内容 : " + new String(message.getPayload()));
          }
      }
      

      5、结果展示

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第21张

      一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连,img,第22张



      MQTT 客户端重连

      MQTT客户端重连主要有两种方法

      第一种:自动重连

      设置org.eclipse.paho.client.mqttv3.MqttConnectOptions#setAutomaticReconnect为true

      MqttConnectOptions options = new MqttConnectOptions();
      options.setAutomaticReconnect(true);
      

      然后callback实现org.eclipse.paho.client.mqttv3.MqttCallbackExtended,这个接口提供了一个连接完成的回调方法,连接完成后做一些操作,如订阅主题。

      public MyMqttCallback implements MqttCallbackExtended {
              /**
               * 连接成功会进入到这里
               * @param reconnect
               * @param serverURI
               */
              @Override
              public void connectComplete(boolean reconnect, String serverURI) {
      			// 可以做订阅主题
              }
      }
      

      这样当断开连接后,MqttClient内部会自动进行重连,每次连接成功后都会回调connectComplete()方法。

      第二种:自定义重连

      如果不指定setAutomaticReconnect为true

      可以在public void connectionLost(Throwable throwable)中处理重连

      @Override
          public void connectionLost(Throwable throwable) {
              System.out.println("失去连接:" + throwable.getMessage());
              int times = 1;
              while (!client.isConnected()) {
                  try {
                      System.out.println("重新连接, 第" + (times++) + "次");
                      client.reconnect();
                      System.out.println("重连成功");
                      break;
                  } catch (MqttException e) {
                      e.printStackTrace();
                      System.out.println("重连失败, msg:" + e.getMessage());
                  }
                  // 每隔10秒重试一次
                  try {
                      TimeUnit.SECONDS.sleep(10);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
              // 执行到这里,说明连接成功,重新订阅主题
              xxx
          }