🏆 作者简介,愚公搬代码
🏆《头衔》:华为云特约编辑,华为云云享专家,华为开发者专家,华为产品云测专家,CSDN博客专家,阿里云专家博主,阿里云签约作者,腾讯云优秀博主,腾讯云内容共创官,掘金优秀博主,51CTO博客专家等。
🏆《近期荣誉》:2022年CSDN博客之星TOP2,2022年华为云十佳博主等。
🏆《博客内容》:.NET、Java、Python、Go、Node、前端、IOS、Android、鸿蒙、Linux、物联网、网络安全、大数据、人工智能、U3D游戏、小程序等相关领域知识。
🏆🎉欢迎 👍点赞✍评论⭐收藏
微服务中的消息驱动是一种基于消息传递的方式,用于实现微服务之间的通信和协作。它通过异步发送和接收消息,可以降低系统耦合度、提高系统可扩展性和可靠性。
常用的消息驱动技术包括消息队列、消息总线和事件驱动架构。使用消息队列时,微服务可以将消息发送到队列中,其他微服务可以订阅这些消息并进行处理。使用消息总线时,微服务可以将消息发布到消息总线,其他微服务可以订阅这些消息并进行处理。使用事件驱动架构时,微服务可以触发事件并将事件传递给其他微服务进行处理。
消息驱动可以帮助微服务实现异步通信、解耦和灵活性,但也需要注意系统的可靠性和数据一致性。因此,需要在设计和实现消息驱动时考虑如何处理异常情况和保证数据的正确性。
Spring Cloud Stream是一个用于构建基于消息驱动的微服务架构的框架。它是基于Spring Boot构建的,并且提供了一个简单的编程模型来连接消息代理和微服务应用程序。
Spring Cloud Stream通过定义消息通道和绑定器来抽象消息代理的实现细节,这样就使得应用程序可以适配不同的消息代理,如Apache Kafka、RabbitMQ和Amazon Kinesis等。开发者只需要关注于业务逻辑的实现,而不需要关心底层消息代理的细节。
使用Spring Cloud Stream可以有效地解耦微服务之间的依赖,提高系统的可伸缩性和可维护性。它还支持多种消息序列化和反序列化方式,以及消息的压缩、分区和路由等特性,使得应用程序可以根据实际需求得到更好的性能和可用性。
Spring Cloud Stream是一个非常强大的消息驱动框架,能够大大简化消息驱动应用程序的开发和运维工作。
组成 | 说明 |
---|---|
Middleware | 中间件,支持 RabbitMQ 和 Kafka。 |
Binder | 目标绑定器,目标指的是 Kafka 还是 RabbitMQ。绑定器就是封装了目标中间件的包。如果操作的是 Kafka 就使用 spring-cloud-stream-binder-kafka,如果操作的是 RabbitMQ 就使用 spring-cloud-stream-binder-rabbit。 |
@Input | 注解标识输入通道,接收(消息消费者)的消息将通过该通道进入应用程序。 |
@Output | 注解标识输出通道,发布(消息生产者)的消息将通过该通道离开应用程序。 |
@StreamListener | 监听队列,消费者的队列的消息接收。 |
@EnableBinding | 注解标识绑定,将信道 channel 和交换机 exchange 绑定在一起。 |
父依赖如下:
4.0.0 com.itheima stream-parent pom 1.0-SNAPSHOT stream-producer stream-consumer org.springframework.boot spring-boot-starter-parent 2.1.0.RELEASE UTF-8 UTF-8 1.8 Greenwich.RELEASE org.springframework.cloud spring-cloud-dependencies ${spring-cloud.version} pom import
1、创建消息生产者模块,引入依赖 starter-stream-rabbit
stream-parent com.itheima 1.0-SNAPSHOT 4.0.0 stream-producer org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-starter-stream-rabbit
2、编写配置,定义 binder,和 bingings
server: port: 8000 spring: cloud: stream: # 定义绑定器,绑定到哪个消息中间件上 binders: itheima_binder: # 自定义的绑定器名称 type: rabbit # 绑定器类型 environment: # 指定mq的环境 spring: rabbitmq: host: localhost # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 bindings: output: # channel名称 binder: itheima_binder #指定使用哪一个binder destination: itheima_exchange # 消息目的地
3、定义消息发送业务类。添加 @EnableBinding(Source.class),注入MessageChannel output ,完成消息发送
package com.itheima.stream.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ // @Component // @EnableBinding(Source.class) // public class MessageProducer { // @Autowired // private Source source; // /** // * 发送消息 // * // * @param message // */ // public void send(String message) { // source.output().send(MessageBuilder.withPayload(message).build()); // } // } @Component @EnableBinding(Source.class) public class MessageProducer { @Autowired private MessageChannel output; public void send(){ String msessage = "hello stream~~~"; //发送消息 output.send(MessageBuilder.withPayload(msessage).build()); System.out.println("消息发送成功~~~"); } }
1、创建消息消费者模块,引入依赖 starter-stream-rabbit
stream-parent com.itheima 1.0-SNAPSHOT 4.0.0 stream-consumer org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-starter-stream-rabbit
2、 编写配置,定义 binder,和 bingings
server: port: 9000 spring: cloud: stream: # 定义绑定器,绑定到哪个消息中间件上 binders: itheima_binder: # 自定义的绑定器名称 type: rabbit # 绑定器类型 environment: # 指定mq的环境 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / bindings: input: # channel名称 binder: itheima_binder #指定使用哪一个binder destination: itheima_exchange # 消息目的地
3、 定义消息接收业务类。添加 @EnableBinding(Sink.class),使用@StreamListener(Sink.INPUT),完成消息接收。
package com.itheima.stream.consumer; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; /** * 消息接收类 */ // @Component // @EnableBinding(Sink.class) // public class MessageConsumer { // /** // * 接收消息 // * // * @param message // */ // @StreamListener(Sink.INPUT) // public void receive(String message) { // System.out.println("message = " + message); // } // } @EnableBinding({Sink.class}) @Component public class MessageListener { @StreamListener(Sink.INPUT) public void receive(Message message){ System.out.println(message); System.out.println(message.getPayload()); } }
1、自定义消息发送通道 MySource.java
package com.example.channel; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定义消息发送通道 */ public interface MySource { String MY_OUTPUT = "my_output"; @Output(MY_OUTPUT) MessageChannel myOutput(); }
2、自定义消息接收通道 MySink.java
package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息接收通道 */ public interface MySink { String MY_INPUT = "my_input"; @Input(MY_INPUT) SubscribableChannel myInput(); }
1、生产者配置文件
server: port: 8001 # 端口 spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息发送通道 # 与 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 注解的 value 相同 output: destination: stream.message # 绑定的交换机名称 my_output: destination: my.message # 绑定的交换机名称
2、消费者配置文件
server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 my_input: destination: my.message # 绑定的交换机名称
1、生产者
package com.example.producer; import com.example.channel.MySource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MySource.class) public class MyMessageProducer { @Autowired private MySource mySource; /** * 发送消息 * * @param message */ public void send(String message) { mySource.myOutput().send(MessageBuilder.withPayload(message).build()); } }
2、消费者
package com.example.consumer; import com.example.channel.MySink; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MySink.class) public class MyMessageConsumer { /** * 接收消息 * * @param message */ @StreamListener(MySink.MY_INPUT) public void receive(String message) { System.out.println("message = " + message); } }
1、自定义消息发送通道 MySource02.java
package com.example.channel; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定义消息发送通道 */ public interface MySource02 { String MY_OUTPUT = "default.message"; @Output(MY_OUTPUT) MessageChannel myOutput(); }
2、自定义消息接收通道 MySink02.java
package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息接收通道 */ public interface MySink02 { String MY_INPUT = "default.message"; @Input(MY_INPUT) SubscribableChannel myInput(); }
1、生产者配置文件
server: port: 8001 # 端口 spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址
2、消费者配置文件
server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址
1、消息生产者 MyMessageProducer02.java
package com.example.producer; import com.example.channel.MySource02; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MySource02.class) public class MyMessageProducer02 { @Autowired private MySource02 mySource02; /** * 发送消息 * * @param message */ public void send(String message) { mySource02.myOutput().send(MessageBuilder.withPayload(message).build()); } }
2、消息消费者 MyMessageConsumer02.java
package com.example.consumer; import com.example.channel.MySink02; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MySink02.class) public class MyMessageConsumer02 { /** * 接收消息 * * @param message */ @StreamListener(MySink02.MY_INPUT) public void receive(String message) { System.out.println("message = " + message); } }
1、自定义消息发送通道
package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息通道 */ public interface MyProcessor { String SOURCE_MESSAGE = "source.message"; String SMS_MESSAGE = "sms.message"; String EMAIL_MESSAGE = "email.message"; @Output(SOURCE_MESSAGE) MessageChannel sourceOutput(); @Input(SMS_MESSAGE) SubscribableChannel smsInput(); @Input(EMAIL_MESSAGE) SubscribableChannel emailInput(); }
2、自定义消息接收通道
package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息通道 */ public interface MyProcessor { String SOURCE_MESSAGE = "source.message"; String SMS_MESSAGE = "sms.message"; String EMAIL_MESSAGE = "email.message"; @Input(SOURCE_MESSAGE) MessageChannel sourceOutput(); @Output(SMS_MESSAGE) SubscribableChannel smsOutput(); @Output(EMAIL_MESSAGE) SubscribableChannel emailOutput(); }
1、生产者配置文件
spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址
2、消费者配置文件
spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址
1、发送消息
package com.example.producer; import com.example.channel.MyProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MyProcessor.class) public class SourceMessageProducer { private Logger logger = LoggerFactory.getLogger(SourceMessageProducer.class); @Autowired private MyProcessor myProcessor; /** * 发送原始消息 * * @param sourceMessage */ public void send(String sourceMessage) { logger.info("原始消息发送成功,原始消息为:{}", sourceMessage); myProcessor.sourceOutput().send(MessageBuilder.withPayload(sourceMessage).build()); } }
2、接收消息
package com.example.consumer; import com.example.channel.MyProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MyProcessor.class) public class SmsAndEmailMessageConsumer { private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageConsumer.class); /** * 接收消息 电话号码 * * @param phoneNum */ @StreamListener(MyProcessor.SMS_MESSAGE) public void receiveSms(String phoneNum) { logger.info("电话号码为:{},调用短信发送服务,发送短信...", phoneNum); } /** * 接收消息 邮箱地址 * * @param emailAddress */ @StreamListener(MyProcessor.EMAIL_MESSAGE) public void receiveEmail(String emailAddress) { logger.info("邮箱地址为:{},调用邮件发送服务,发送邮件...", emailAddress); } }
1、接收消息
package com.example.consumer; import com.example.channel.MyProcessor; import com.example.producer.SmsAndEmailMessageProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MyProcessor.class) public class SourceMessageConsumer { private Logger logger = LoggerFactory.getLogger(SourceMessageConsumer.class); @Autowired private SmsAndEmailMessageProducer smsAndEmailMessageProducer; /** * 接收原始消息,处理后并发送 * * @param sourceMessage */ @StreamListener(MyProcessor.SOURCE_MESSAGE) public void receive(String sourceMessage) { logger.info("原始消息接收成功,原始消息为:{}", sourceMessage); // 发送消息 电话号码 smsAndEmailMessageProducer.sendSms(sourceMessage.split("\\|")[0]); // 发送消息 邮箱地址 smsAndEmailMessageProducer.sendEmail(sourceMessage.split("\\|")[1]); } }
2、发送消息
package com.example.producer; import com.example.channel.MyProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MyProcessor.class) public class SmsAndEmailMessageProducer { private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageProducer.class); @Autowired private MyProcessor myProcessor; /** * 发送消息 电话号码 * * @param smsMessage */ public void sendSms(String smsMessage) { logger.info("电话号码消息发送成功,消息为:{}", smsMessage); myProcessor.smsOutput().send(MessageBuilder.withPayload(smsMessage).build()); } /** * 发送消息 邮箱地址 * * @param emailMessage */ public void sendEmail(String emailMessage) { logger.info("邮箱地址消息发送成功,消息为:{}", emailMessage); myProcessor.emailOutput().send(MessageBuilder.withPayload(emailMessage).build()); } }
消息驱动微服务 A 控制台打印结果如下:
电话号码为:10086,调用短信发送服务,发送短信... 邮箱地址为:10086@email.com,调用邮件发送服务,发送邮件...
消息驱动微服务 B 控制台打印结果如下:
原始消息接收成功,原始消息为:10086|10086@email.com 电话号码消息发送成功,消息为:10086 邮箱地址消息发送成功,消息为:10086@email.com
1、配置分组
消费者1
server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 group: group-A
消费者2
server: port: 8003 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 group: group-A
运行单元测试发送消息,此时多个消息消费者只有其中一个可以消费。
1、生产者
server: port: 8001 # 端口 spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息发送通道 # 与 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 注解的 value 相同 output: destination: stream.message # 绑定的交换机名称 producer: partition-key-expression: payload # 配置分区键的表达式规则 partition-count: 2 # 配置消息分区的数量
package org.springframework.messaging; @FunctionalInterface public interface MessageChannel { long INDEFINITE_TIMEOUT = -1L; default boolean send(Message> message) { return this.send(message, -1L); } boolean send(Message> var1, long var2); }
package org.springframework.messaging.support; import java.io.Serializable; import java.util.Map; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; public class GenericMessageimplements Message , Serializable { private static final long serialVersionUID = 4268801052358035098L; private final T payload; private final MessageHeaders headers; ... }
package com.example.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(Source.class) public class MessageProducer { @Autowired private Source source; /** * 发送消息 * * @param message */ public void send(String message) { source.output().send(MessageBuilder.withPayload(message).setHeader("xxx", 0).build()); } }
2、消费者
消费者1
server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: instance-count: 2 # 消费者总数 instance-index: 0 # 当前消费者的索引 bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 group: group-A consumer: partitioned: true # 开启分区支持
消费者2
server: port: 8003 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: instance-count: 2 # 消费者总数 instance-index: 1 # 当前消费者的索引 bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 group: group-A consumer: partitioned: true # 开启分区支持
运行单元测试发送消息,此时多个消息消费者只有其中一个可以消费所有消息。
亲爱的读者,
我在这篇文章中投入了大量的心血和时间,希望为您提供有价值的内容。这篇文章包含了深入的研究和个人经验,我相信这些信息对您非常有帮助。
如果您觉得这篇文章对您有所帮助,我诚恳地请求您考虑赞赏1元钱的支持。这个金额不会对您的财务状况造成负担,但它会对我继续创作高质量的内容产生积极的影响。
我之所以写这篇文章,是因为我热爱分享有用的知识和见解。您的支持将帮助我继续这个使命,也鼓励我花更多的时间和精力创作更多有价值的内容。
如果您愿意支持我的创作,请扫描下面二维码,您的支持将不胜感激。同时,如果您有任何反馈或建议,也欢迎与我分享。
再次感谢您的阅读和支持!
最诚挚的问候, “愚公搬代码”