创建项目,引入 MQTT依赖:
org.springframework.boot spring-boot-starter-web 2.3.12.RELEASE org.springframework.integration spring-integration-mqtt 6.1.2 org.projectlombok lombok 1.18.28 com.alibaba fastjson 1.2.75
在 application.yml文件中,定义 MQTT连接信息。
## MQTT 基本连接参数 ## mqtt: host: tcp://192.168.xxx.xxx:1883 # host: tcp://broker.emqx.io:1883 userName: admin passWord: xxxxxx qos: 1 clientId: ClientId_local #ClientId_local必须唯一。 timeout: 10 # 超时时间 keepalive: 30 # 保持连接时间 clearSession: true # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息) topic1: A/b/# # 通配符主题只能用于订阅,不能用于发布。+:表示单层通配符,#:表示多层通配符 topic2: A/abc topic3: ABC
创建一个 MqttConfig配置类,并获取配置文件的 MQTT的连接参数。创建 MyMqttClient类注入Spring。
@Slf4j @Configuration public class MqttConfig { @Value("${mqtt.host}") public String host; @Value("${mqtt.username}") public String username; @Value("${mqtt.password}") public String password; @Value("${mqtt.clientId}") public String clientId; @Value("${mqtt.timeout}") public int timeOut; @Value("${mqtt.keepalive}") public int keepAlive; @Value("${mqtt.clearSession}") public boolean clearSession; @Value("${mqtt.topic1}") public String topic1; @Value("${mqtt.topic2}") public String topic2; @Value("${mqtt.topic3}") public String topic3; @Bean//注入Spring public MyMqttClient myMqttClient() { MyMqttClient myMqttClient = new MyMqttClient(host, username, password, clientId, timeOut, keepAlive, clearSession); for (int i = 0; i < 10; i++) { try { myMqttClient.connect(); // 这里可以订阅主题,推荐放到 MqttCallbackExtended.connectComplete方法中 //myMqttClient.subscribe("ABC", 1); return myMqttClient; } catch (MqttException e) { log.error("== MqttConfig ==> MQTT connect exception, connect time = {}", i); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); } } } return myMqttClient; } }
创建 MQTT 客户端封装类MyMqttClient。对 MQTT Broker进行操作。
@Slf4j public class MyMqttClient { /** * MQTT Broker 基本连接参数,用户名、密码为非必选参数 */ private String host; private String username; private String password; private String clientId; private int timeout; private int keepalive; private boolean clearSession; /** * MQTT 客户端 */ private static MqttClient client; public static MqttClient getClient() { return client; } public static void setClient(MqttClient client) { MyMqttClient.client = client; } public MyMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive, boolean clearSession) { this.host = host; this.username = username; this.password = password; this.clientId = clientId; this.timeout = timeOut; this.keepalive = keepAlive; this.clearSession = clearSession; } /** * 设置 MQTT Broker 基本连接参数 * * @param username * @param password * @param timeout * @param keepalive * @return */ public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); options.setCleanSession(clearSession); options.setAutomaticReconnect(true); return options; } /** * 连接 MQTT Broker,得到 MqttClient连接对象 */ public void connect() throws MqttException { if (client == null) { client = new MqttClient(host, clientId, new MemoryPersistence()); // 设置回调 client.setCallback(new MyMqttCallback(MyMqttClient.this)); } // 连接参数 MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive); if (!client.isConnected()) { client.connect(mqttConnectOptions); } else { client.disconnect(); client.connect(mqttConnectOptions); } log.info("== MyMqttClient ==> MQTT connect success");//未发生异常,则连接成功 } /** * 发布,默认qos为0,非持久化 * * @param pushMessage * @param topic */ public void publish(String pushMessage, String topic) { publish(pushMessage, topic, 0, false); } /** * 发布消息 * * @param pushMessage * @param topic * @param qos * @param retained:留存 */ public void publish(String pushMessage, String topic, int qos, boolean retained) { MqttMessage message = new MqttMessage(); message.setPayload(pushMessage.getBytes()); message.setQos(qos); message.setRetained(retained); MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic); if (null == mqttTopic) { log.error("== MyMqttClient ==> topic is not exist"); } MqttDeliveryToken token;//Delivery:配送 synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 try { token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件 token.waitForCompletion(1000L); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } } /** * 订阅某个主题,qos默认为0 * * @param topic */ public void subscribe(String topic) { subscribe(topic, 0); } /** * 订阅某个主题 * * @param topic * @param qos */ public void subscribe(String topic, int qos) { try { MyMqttClient.getClient().subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } log.info("== MyMqttClient ==> 订阅主题成功:topic = {}, qos = {}", topic, qos); } /** * 取消订阅主题 * * @param topic 主题名称 */ public void cleanTopic(String topic) { if (client != null && client.isConnected()) { try { client.unsubscribe(topic); } catch (MqttException e) { e.printStackTrace(); } } else { log.error("== MyMqttClient ==> 取消订阅失败!"); } log.info("== MyMqttClient ==> 取消订阅主题成功:topic = {}", topic); } }
说明:
创建一个 MqttClient回调类MyMqttCallback。
@Slf4j public class MyMqttCallback implements MqttCallbackExtended { //手动注入 private MqttConfig mqttConfig = SpringUtils.getBean(MqttConfig.class); private MyMqttClient myMqttClient; public MyMqttCallback(MyMqttClient myMqttClient) { this.myMqttClient = myMqttClient; } /** * MQTT Broker连接成功时被调用的方法。在该方法中可以执行 订阅系统约定的主题(推荐使用)。 * 如果 MQTT Broker断开连接之后又重新连接成功时,主题也需要再次订阅,将重新订阅主题放在连接成功后的回调方法中比较合理。 * * @param reconnect * @param serverURI MQTT Broker的url */ @Override public void connectComplete(boolean reconnect, String serverURI) { String connectMode = reconnect ? "重连" : "直连"; log.info("== MyMqttCallback ==> MQTT 连接成功,连接方式:{},serverURI:{}", connectMode, serverURI); //订阅主题 myMqttClient.subscribe(mqttConfig.topic1, 1); myMqttClient.subscribe(mqttConfig.topic2, 1); myMqttClient.subscribe(mqttConfig.topic3, 1); ListtopicList = new ArrayList<>(); topicList.add(mqttConfig.topic1); topicList.add(mqttConfig.topic2); topicList.add(mqttConfig.topic3); log.info("== MyMqttCallback ==> 连接方式:{},订阅主题成功,topic:{}", connectMode, topicList); } /** * 丢失连接,可在这里做重连 * 只会调用一次 * * @param throwable */ @Override public void connectionLost(Throwable throwable) { log.error("== MyMqttCallback ==> connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage()); long reconnectTimes = 1; while (true) { try { if (MyMqttClient.getClient().isConnected()) { //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择 log.warn("== MyMqttCallback ==> mqtt reconnect success end 重新连接 重新订阅成功"); return; } reconnectTimes += 1; log.warn("== MyMqttCallback ==> mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes); MyMqttClient.getClient().reconnect(); } catch (MqttException e) { log.error("== MyMqttCallback ==> mqtt断连异常", e); } try { Thread.sleep(5000); } catch (InterruptedException e1) { } } } /** * 接收到消息(subscribe订阅的主题消息)时被调用的方法 * * @param topic * @param mqttMessage * @throws Exception 后得到的消息会执行到这里面 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("== MyMqttCallback ==> messageArrived 接收消息主题: {},接收消息内容: {}", topic, new String(mqttMessage.getPayload())); /** * 根据订阅的主题分别处理业务。可以通过if-else或者策略模式来分别处理不同的主题消息。 */ //topic1主题 if (topic.equals("ABC")) { Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8)); //TODO 业务处理 //doSomething1(maps); log.info("== MyMqttCallback ==> messageArrived 接收消息主题: {},{}业务处理消息内容完成", topic, "TodoService1"); } //topic2主题 if (topic.equals("A/b/1qaz")) { Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8)); //TODO 业务处理 //doSomething2(maps); log.info("== MyMqttCallback ==> messageArrived 接收消息主题: {},{}业务处理消息内容完成", topic, "TodoService2"); } } /** * 消息发送(publish)完成时被调用的方法 * * @param iMqttDeliveryToken */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("== MyMqttCallback ==> deliveryComplete 消息发送完成,Complete= {}", iMqttDeliveryToken.isComplete()); } }
MqttCallback类方法说明:
MqttCallbackExtended类方法说明:该类继承MqttCallback类
@Component public class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware { /** * Spring应用上下文环境 */ private static ConfigurableListableBeanFactory beanFactory; private static ApplicationContext applicationContext; @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { SpringUtils.beanFactory = beanFactory; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringUtils.applicationContext = applicationContext; } /** * 获取对象 * * @param name * @return Object 一个以所给名字注册的bean的实例 * @throws org.springframework.beans.BeansException */ @SuppressWarnings("unchecked") public staticT getBean(String name) throws BeansException { return (T) beanFactory.getBean(name); } /** * 获取类型为requiredType的对象 * * @param clz * @return * @throws org.springframework.beans.BeansException */ public static T getBean(Class clz) throws BeansException { T result = (T) beanFactory.getBean(clz); return result; } /** * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true * * @param name * @return boolean */ public static boolean containsBean(String name) { return beanFactory.containsBean(name); } /** * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException) * * @param name * @return boolean * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException */ public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { return beanFactory.isSingleton(name); } /** * @param name * @return Class 注册对象的类型 * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException */ public static Class> getType(String name) throws NoSuchBeanDefinitionException { return beanFactory.getType(name); } /** * 如果给定的bean名字在bean定义中有别名,则返回这些别名 * * @param name * @return * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException */ public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { return beanFactory.getAliases(name); } /** * 获取aop代理对象 * * @param invoker * @return */ @SuppressWarnings("unchecked") public static T getAopProxy(T invoker) { return (T) AopContext.currentProxy(); } /** * 获取当前的环境配置,无配置返回null * * @return 当前的环境配置 */ public static String[] getActiveProfiles() { return applicationContext.getEnvironment().getActiveProfiles(); } /** * 获取当前的环境配置,当有多个环境配置时,只获取第一个 * * @return 当前的环境配置 */ public static String getActiveProfile() { final String[] activeProfiles = getActiveProfiles(); if (activeProfiles == null) { return null; } return activeProfiles[0]; } }
到此,Springboot 通过 MqttClient整合操作 MQTT Broker就可以了。
我们在 service层创建一个 MqttService类,业务通过 MqttService类来统一操作 MqttClient。
这里创建一个 MyXxxMqttMsg类,来约定发送消息的载体类格式。
@Data public class MyXxxMqttMsg implements Serializable { private static final long serialVersionUID = -8303548938481407659L; /** * MD5值:MD5_lower(content + timestamp) */ private String md5; /** * 消息内容 */ private String content = ""; /** * 时间戳 */ private Long timestamp; }
1)接口:
public interface MqttService { /** * 添加订阅主题 * * @param topic 主题名称 */ void addTopic(String topic); /** * 取消订阅主题 * * @param topic 主题名称 */ void removeTopic(String topic); /** * 发布主题消息内容 * * @param msgContent * @param topic */ void publish(String msgContent, String topic); }
2)实现类:
@Service public class MqttServiceImpl implements MqttService { @Autowired private MyMqttClient myMqttClient; @Override public void addTopic(String topic) { myMqttClient.subscribe(topic); } @Override public void removeTopic(String topic) { myMqttClient.cleanTopic(topic); } @Override public void publish(String msgContent, String topic) { //MyXxxMqttMsg 转Json MyXxxMqttMsg myXxxMqttMsg = new MyXxxMqttMsg(); myXxxMqttMsg.setContent(msgContent); myXxxMqttMsg.setTimestamp(System.currentTimeMillis()); // TODO Md5值 myXxxMqttMsg.setMd5(UUID.randomUUID().toString()); String msgJson = JSON.toJSONString(myXxxMqttMsg); //发布消息 myMqttClient.publish(msgJson, topic); }
创建一个 MyMqttController类,来操作一下 MQTT。
@RestController @RequestMapping("/mqtt") @Api(value = "MyMqttController", tags = {"MQTT相关操作接口"}) public class MyMqttController { @Autowired private MqttService mqttService; @GetMapping("/addTopic") @ApiOperation(value = "添加订阅主题接口") public void addTopic(String topic) { mqttService.addTopic(topic); } @GetMapping("/removeTopic") @ApiOperation(value = "取消订阅主题接口") public void removeTopic(String topic) { mqttService.removeTopic(topic); } @PostMapping("/removeTopic") @ApiOperation(value = "发布主题消息内容接口") public void removeTopic(String msgContent, String topic) { mqttService.publish(msgContent, topic); } }
订阅和取消主题操作:MQTTX发布了一个主题消息。
发布通配符主题消息:
– 求知若饥,虚心若愚。