【愚公系列】2023年11月 Java教学课程 196-SpringCloud(Spring Cloud Stream消息驱动)
作者:mmseoamin日期:2023-12-14

【愚公系列】2023年11月 Java教学课程 196-SpringCloud(Spring Cloud Stream消息驱动),在这里插入图片描述,第1张

🏆 作者简介,愚公搬代码

🏆《头衔》:华为云特约编辑,华为云云享专家,华为开发者专家,华为产品云测专家,CSDN博客专家,阿里云专家博主,阿里云签约作者,腾讯云优秀博主,腾讯云内容共创官,掘金优秀博主,51CTO博客专家等。

🏆《近期荣誉》:2022年CSDN博客之星TOP2,2022年华为云十佳博主等。

🏆《博客内容》:.NET、Java、Python、Go、Node、前端、IOS、Android、鸿蒙、Linux、物联网、网络安全、大数据、人工智能、U3D游戏、小程序等相关领域知识。

🏆🎉欢迎 👍点赞✍评论⭐收藏

文章目录

  • 🚀前言
  • 🚀一、Spring Cloud Stream消息驱动
    • 🔎1.概述
    • 🔎2.组件
    • 🔎3.基本使用
      • 🦋3.1 Stream 消息生产者
      • 🦋3.2 Stream 消息消费者
      • 🔎4.自定义消息通道
        • 🦋4.1 创建消息通道
        • 🦋4.2 配置文件
        • 🦋4.3 代码重构
        • 🔎5.配置优化
          • 🦋5.1 创建消息通道
          • 🦋5.2 配置文件
          • 🦋5.3 代码重构
          • 🔎6.案例
            • 🦋6.1 创建消息通道
            • 🦋6.2 配置文件
            • 🦋6.3 消息驱动微服务A
            • 🦋6.4 消息驱动微服务B
            • 🔎7.消息分组
            • 🔎8.消息分区
            • 🚀感谢:给读者的一封信

              🚀前言

              微服务中的消息驱动是一种基于消息传递的方式,用于实现微服务之间的通信和协作。它通过异步发送和接收消息,可以降低系统耦合度、提高系统可扩展性和可靠性。

              常用的消息驱动技术包括消息队列、消息总线和事件驱动架构。使用消息队列时,微服务可以将消息发送到队列中,其他微服务可以订阅这些消息并进行处理。使用消息总线时,微服务可以将消息发布到消息总线,其他微服务可以订阅这些消息并进行处理。使用事件驱动架构时,微服务可以触发事件并将事件传递给其他微服务进行处理。

              消息驱动可以帮助微服务实现异步通信、解耦和灵活性,但也需要注意系统的可靠性和数据一致性。因此,需要在设计和实现消息驱动时考虑如何处理异常情况和保证数据的正确性。

              🚀一、Spring Cloud Stream消息驱动

              🔎1.概述

              Spring Cloud Stream是一个用于构建基于消息驱动的微服务架构的框架。它是基于Spring Boot构建的,并且提供了一个简单的编程模型来连接消息代理和微服务应用程序。

              Spring Cloud Stream通过定义消息通道绑定器来抽象消息代理的实现细节,这样就使得应用程序可以适配不同的消息代理,如Apache Kafka、RabbitMQ和Amazon Kinesis等。开发者只需要关注于业务逻辑的实现,而不需要关心底层消息代理的细节。

              使用Spring Cloud Stream可以有效地解耦微服务之间的依赖,提高系统的可伸缩性和可维护性。它还支持多种消息序列化和反序列化方式,以及消息的压缩、分区和路由等特性,使得应用程序可以根据实际需求得到更好的性能和可用性。

              Spring Cloud Stream是一个非常强大的消息驱动框架,能够大大简化消息驱动应用程序的开发和运维工作。

              🔎2.组件

              组成说明
              Middleware中间件,支持 RabbitMQ 和 Kafka。
              Binder目标绑定器,目标指的是 Kafka 还是 RabbitMQ。绑定器就是封装了目标中间件的包。如果操作的是 Kafka 就使用 spring-cloud-stream-binder-kafka,如果操作的是 RabbitMQ 就使用 spring-cloud-stream-binder-rabbit。
              @Input注解标识输入通道,接收(消息消费者)的消息将通过该通道进入应用程序。
              @Output注解标识输出通道,发布(消息生产者)的消息将通过该通道离开应用程序。
              @StreamListener监听队列,消费者的队列的消息接收。
              @EnableBinding注解标识绑定,将信道 channel 和交换机 exchange 绑定在一起。

              【愚公系列】2023年11月 Java教学课程 196-SpringCloud(Spring Cloud Stream消息驱动),在这里插入图片描述,第2张

              🔎3.基本使用

              父依赖如下:

              
              
                  4.0.0
                  com.itheima
                  stream-parent
                  pom
                  1.0-SNAPSHOT
                  
                      stream-producer
                      stream-consumer
                  
                  
                      org.springframework.boot
                      spring-boot-starter-parent
                      2.1.0.RELEASE
                       
                  
                  
                      UTF-8
                      UTF-8
                      1.8
                      Greenwich.RELEASE
                  
                  
                      
                          
                              org.springframework.cloud
                              spring-cloud-dependencies
                              ${spring-cloud.version}
                              pom
                              import
                          
                      
                  
              
              

              🦋3.1 Stream 消息生产者

              1、创建消息生产者模块,引入依赖 starter-stream-rabbit

              
              
                  
                      stream-parent
                      com.itheima
                      1.0-SNAPSHOT
                  
                  4.0.0
                  stream-producer
                  
                      
                      
                          org.springframework.boot
                          spring-boot-starter-web
                      
                      
                      
                          org.springframework.cloud
                          spring-cloud-starter-stream-rabbit
                      
                  
              
              

              2、编写配置,定义 binder,和 bingings

              server:
                port: 8000
              spring:
                cloud:
                  stream:
                    # 定义绑定器,绑定到哪个消息中间件上
                    binders:
                      itheima_binder: # 自定义的绑定器名称
                        type: rabbit # 绑定器类型
                        environment: # 指定mq的环境
                          spring:
                            rabbitmq:
                              host: localhost       # 服务器 IP
              			    port: 5672            # 服务器端口
              			    username: guest       # 用户名
              			    password: guest       # 密码
              			    virtual-host: /       # 虚拟主机地址
                    bindings:
                      output: # channel名称
                        binder: itheima_binder #指定使用哪一个binder
                        destination: itheima_exchange # 消息目的地
              

              3、定义消息发送业务类。添加 @EnableBinding(Source.class),注入MessageChannel output ,完成消息发送

              package com.itheima.stream.producer;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.cloud.stream.annotation.EnableBinding;
              import org.springframework.cloud.stream.messaging.Source;
              import org.springframework.messaging.MessageChannel;
              import org.springframework.messaging.support.MessageBuilder;
              import org.springframework.stereotype.Component;
              /**
               * 消息生产者
               */
              // @Component
              // @EnableBinding(Source.class)
              // public class MessageProducer {
              //     @Autowired
              //     private Source source;
              //     /**
              //      * 发送消息
              //      *
              //      * @param message
              //      */
              //     public void send(String message) {
              //         source.output().send(MessageBuilder.withPayload(message).build());
              //     }
              // }
              @Component
              @EnableBinding(Source.class)
              public class MessageProducer {
                  @Autowired
                  private MessageChannel output;
                  public void send(){
                      String msessage = "hello stream~~~";
                      //发送消息
                      output.send(MessageBuilder.withPayload(msessage).build());
                      System.out.println("消息发送成功~~~");
                  }
              }
              

              【愚公系列】2023年11月 Java教学课程 196-SpringCloud(Spring Cloud Stream消息驱动),在这里插入图片描述,第3张

              🦋3.2 Stream 消息消费者

              1、创建消息消费者模块,引入依赖 starter-stream-rabbit

              
              
                  
                      stream-parent
                      com.itheima
                      1.0-SNAPSHOT
                  
                  4.0.0
                  stream-consumer
                  
                      
                      
                          org.springframework.boot
                          spring-boot-starter-web
                      
                      
                      
                          org.springframework.cloud
                          spring-cloud-starter-stream-rabbit
                      
                  
              
              

              2、 编写配置,定义 binder,和 bingings

              server:
                port: 9000
              spring:
                cloud:
                  stream:
                    # 定义绑定器,绑定到哪个消息中间件上
                    binders:
                      itheima_binder: # 自定义的绑定器名称
                        type: rabbit # 绑定器类型
                        environment: # 指定mq的环境
                          spring:
                            rabbitmq:
                              host: localhost
                              port: 5672
                              username: guest
                              password: guest
                              virtual-host: /
                    bindings:
                      input: # channel名称
                        binder: itheima_binder #指定使用哪一个binder
                        destination: itheima_exchange # 消息目的地
              

              3、 定义消息接收业务类。添加 @EnableBinding(Sink.class),使用@StreamListener(Sink.INPUT),完成消息接收。

              package com.itheima.stream.consumer;
              import org.springframework.cloud.stream.annotation.EnableBinding;
              import org.springframework.cloud.stream.annotation.StreamListener;
              import org.springframework.cloud.stream.messaging.Sink;
              import org.springframework.messaging.Message;
              import org.springframework.stereotype.Component;
              /**
               * 消息接收类
               */
               // @Component
              // @EnableBinding(Sink.class)
              // public class MessageConsumer {
              //     /**
              //      * 接收消息
              //      *
              //      * @param message
              //      */
              //     @StreamListener(Sink.INPUT)
              //     public void receive(String message) {
              //         System.out.println("message = " + message);
              //     }
              // }
              @EnableBinding({Sink.class})
              @Component
              public class MessageListener {
                  @StreamListener(Sink.INPUT)
                  public void receive(Message message){
                      System.out.println(message);
                      System.out.println(message.getPayload());
                  }
              }
              

              【愚公系列】2023年11月 Java教学课程 196-SpringCloud(Spring Cloud Stream消息驱动),在这里插入图片描述,第4张

              🔎4.自定义消息通道

              🦋4.1 创建消息通道

              1、自定义消息发送通道 MySource.java

              package com.example.channel;
              import org.springframework.cloud.stream.annotation.Output;
              import org.springframework.messaging.MessageChannel;
              /**
               * 自定义消息发送通道
               */
              public interface MySource {
                  String MY_OUTPUT = "my_output";
                  @Output(MY_OUTPUT)
                  MessageChannel myOutput();
              }
              

              2、自定义消息接收通道 MySink.java

              package com.example.channel;
              import org.springframework.cloud.stream.annotation.Input;
              import org.springframework.messaging.SubscribableChannel;
              /**
               * 自定义消息接收通道
               */
              public interface MySink {
                  String MY_INPUT = "my_input";
                  @Input(MY_INPUT)
                  SubscribableChannel myInput();
              }
              

              🦋4.2 配置文件

              1、生产者配置文件

              server:
                port: 8001 # 端口
              spring:
                application:
                  name: stream-producer # 应用名称
                rabbitmq:
                  host: 192.168.10.101  # 服务器 IP
                  port: 5672            # 服务器端口
                  username: guest       # 用户名
                  password: guest       # 密码
                  virtual-host: /       # 虚拟主机地址
                cloud:
                  stream:
                    bindings:
                      # 消息发送通道
                      # 与 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 注解的 value 相同
                      output:
                        destination: stream.message # 绑定的交换机名称
                      my_output:
                        destination: my.message # 绑定的交换机名称
              

              2、消费者配置文件

              server:
                port: 8002 # 端口
              spring:
                application:
                  name: stream-consumer # 应用名称
                rabbitmq:
                  host: 192.168.10.101  # 服务器 IP
                  port: 5672            # 服务器端口
                  username: guest       # 用户名
                  password: guest       # 密码
                  virtual-host: /       # 虚拟主机地址
                cloud:
                  stream:
                    bindings:
                      # 消息接收通道
                      # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
                      input:
                        destination: stream.message # 绑定的交换机名称
                      my_input:
                        destination: my.message # 绑定的交换机名称
              

              🦋4.3 代码重构

              1、生产者

              package com.example.producer;
              import com.example.channel.MySource;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.cloud.stream.annotation.EnableBinding;
              import org.springframework.messaging.support.MessageBuilder;
              import org.springframework.stereotype.Component;
              /**
               * 消息生产者
               */
              @Component
              @EnableBinding(MySource.class)
              public class MyMessageProducer {
                  @Autowired
                  private MySource mySource;
                  /**
                   * 发送消息
                   *
                   * @param message
                   */
                  public void send(String message) {
                      mySource.myOutput().send(MessageBuilder.withPayload(message).build());
                  }
              }
              

              2、消费者

              package com.example.consumer;
              import com.example.channel.MySink;
              import org.springframework.cloud.stream.annotation.EnableBinding;
              import org.springframework.cloud.stream.annotation.StreamListener;
              import org.springframework.stereotype.Component;
              /**
               * 消息消费者
               */
              @Component
              @EnableBinding(MySink.class)
              public class MyMessageConsumer {
                  /**
                   * 接收消息
                   *
                   * @param message
                   */
                  @StreamListener(MySink.MY_INPUT)
                  public void receive(String message) {
                      System.out.println("message = " + message);
                  }
              }
              

              【愚公系列】2023年11月 Java教学课程 196-SpringCloud(Spring Cloud Stream消息驱动),在这里插入图片描述,第5张

              🔎5.配置优化

              🦋5.1 创建消息通道

              1、自定义消息发送通道 MySource02.java

              package com.example.channel;
              import org.springframework.cloud.stream.annotation.Output;
              import org.springframework.messaging.MessageChannel;
              /**
               * 自定义消息发送通道
               */
              public interface MySource02 {
                  String MY_OUTPUT = "default.message";
                  @Output(MY_OUTPUT)
                  MessageChannel myOutput();
              }
              

              2、自定义消息接收通道 MySink02.java

              package com.example.channel;
              import org.springframework.cloud.stream.annotation.Input;
              import org.springframework.messaging.SubscribableChannel;
              /**
               * 自定义消息接收通道
               */
              public interface MySink02 {
                  String MY_INPUT = "default.message";
                  @Input(MY_INPUT)
                  SubscribableChannel myInput();
              }
              

              🦋5.2 配置文件

              1、生产者配置文件

              server:
                port: 8001 # 端口
              spring:
                application:
                  name: stream-producer # 应用名称
                rabbitmq:
                  host: 192.168.10.101  # 服务器 IP
                  port: 5672            # 服务器端口
                  username: guest       # 用户名
                  password: guest       # 密码
                  virtual-host: /       # 虚拟主机地址
              

              2、消费者配置文件

              server:
                port: 8002 # 端口
              spring:
                application:
                  name: stream-consumer # 应用名称
                rabbitmq:
                  host: 192.168.10.101  # 服务器 IP
                  port: 5672            # 服务器端口
                  username: guest       # 用户名
                  password: guest       # 密码
                  virtual-host: /       # 虚拟主机地址
              

              🦋5.3 代码重构

              1、消息生产者 MyMessageProducer02.java

              package com.example.producer;
              import com.example.channel.MySource02;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.cloud.stream.annotation.EnableBinding;
              import org.springframework.messaging.support.MessageBuilder;
              import org.springframework.stereotype.Component;
              /**
               * 消息生产者
               */
              @Component
              @EnableBinding(MySource02.class)
              public class MyMessageProducer02 {
                  @Autowired
                  private MySource02 mySource02;
                  /**
                   * 发送消息
                   *
                   * @param message
                   */
                  public void send(String message) {
                      mySource02.myOutput().send(MessageBuilder.withPayload(message).build());
                  }
              }
              

              2、消息消费者 MyMessageConsumer02.java

              package com.example.consumer;
              import com.example.channel.MySink02;
              import org.springframework.cloud.stream.annotation.EnableBinding;
              import org.springframework.cloud.stream.annotation.StreamListener;
              import org.springframework.stereotype.Component;
              /**
               * 消息消费者
               */
              @Component
              @EnableBinding(MySink02.class)
              public class MyMessageConsumer02 {
                  /**
                   * 接收消息
                   *
                   * @param message
                   */
                  @StreamListener(MySink02.MY_INPUT)
                  public void receive(String message) {
                      System.out.println("message = " + message);
                  }
              }
              

              🔎6.案例

              🦋6.1 创建消息通道

              1、自定义消息发送通道

              package com.example.channel;
              import org.springframework.cloud.stream.annotation.Input;
              import org.springframework.cloud.stream.annotation.Output;
              import org.springframework.messaging.MessageChannel;
              import org.springframework.messaging.SubscribableChannel;
              /**
               * 自定义消息通道
               */
              public interface MyProcessor {
                  String SOURCE_MESSAGE = "source.message";
                  String SMS_MESSAGE = "sms.message";
                  String EMAIL_MESSAGE = "email.message";
                  @Output(SOURCE_MESSAGE)
                  MessageChannel sourceOutput();
                  @Input(SMS_MESSAGE)
                  SubscribableChannel smsInput();
                  @Input(EMAIL_MESSAGE)
                  SubscribableChannel emailInput();
              }
              

              2、自定义消息接收通道

              package com.example.channel;
              import org.springframework.cloud.stream.annotation.Input;
              import org.springframework.cloud.stream.annotation.Output;
              import org.springframework.messaging.MessageChannel;
              import org.springframework.messaging.SubscribableChannel;
              /**
               * 自定义消息通道
               */
              public interface MyProcessor {
                  String SOURCE_MESSAGE = "source.message";
                  String SMS_MESSAGE = "sms.message";
                  String EMAIL_MESSAGE = "email.message";
                  @Input(SOURCE_MESSAGE)
                  MessageChannel sourceOutput();
                  @Output(SMS_MESSAGE)
                  SubscribableChannel smsOutput();
                  @Output(EMAIL_MESSAGE)
                  SubscribableChannel emailOutput();
              }
              

              🦋6.2 配置文件

              1、生产者配置文件

              spring:
                application:
                  name: stream-producer # 应用名称
                rabbitmq:
                  host: 192.168.10.101  # 服务器 IP
                  port: 5672            # 服务器端口
                  username: guest       # 用户名
                  password: guest       # 密码
                  virtual-host: /       # 虚拟主机地址
              

              2、消费者配置文件

              spring:
                application:
                  name: stream-consumer # 应用名称
                rabbitmq:
                  host: 192.168.10.101  # 服务器 IP
                  port: 5672            # 服务器端口
                  username: guest       # 用户名
                  password: guest       # 密码
                  virtual-host: /       # 虚拟主机地址
              

              🦋6.3 消息驱动微服务A

              1、发送消息

              package com.example.producer;
              import com.example.channel.MyProcessor;
              import org.slf4j.Logger;
              import org.slf4j.LoggerFactory;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.cloud.stream.annotation.EnableBinding;
              import org.springframework.messaging.support.MessageBuilder;
              import org.springframework.stereotype.Component;
              /**
               * 消息生产者
               */
              @Component
              @EnableBinding(MyProcessor.class)
              public class SourceMessageProducer {
                  private Logger logger = LoggerFactory.getLogger(SourceMessageProducer.class);
                  @Autowired
                  private MyProcessor myProcessor;
                  /**
                   * 发送原始消息
                   *
                   * @param sourceMessage
                   */
                  public void send(String sourceMessage) {
                      logger.info("原始消息发送成功,原始消息为:{}", sourceMessage);
                      myProcessor.sourceOutput().send(MessageBuilder.withPayload(sourceMessage).build());
                  }
              }
              

              2、接收消息

              package com.example.consumer;
              import com.example.channel.MyProcessor;
              import org.slf4j.Logger;
              import org.slf4j.LoggerFactory;
              import org.springframework.cloud.stream.annotation.EnableBinding;
              import org.springframework.cloud.stream.annotation.StreamListener;
              import org.springframework.stereotype.Component;
              /**
               * 消息消费者
               */
              @Component
              @EnableBinding(MyProcessor.class)
              public class SmsAndEmailMessageConsumer {
                  private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageConsumer.class);
                  /**
                   * 接收消息 电话号码
                   *
                   * @param phoneNum
                   */
                  @StreamListener(MyProcessor.SMS_MESSAGE)
                  public void receiveSms(String phoneNum) {
                      logger.info("电话号码为:{},调用短信发送服务,发送短信...", phoneNum);
                  }
                  /**
                   * 接收消息 邮箱地址
                   *
                   * @param emailAddress
                   */
                  @StreamListener(MyProcessor.EMAIL_MESSAGE)
                  public void receiveEmail(String emailAddress) {
                      logger.info("邮箱地址为:{},调用邮件发送服务,发送邮件...", emailAddress);
                  }
              }
              

              🦋6.4 消息驱动微服务B

              1、接收消息

              package com.example.consumer;
              import com.example.channel.MyProcessor;
              import com.example.producer.SmsAndEmailMessageProducer;
              import org.slf4j.Logger;
              import org.slf4j.LoggerFactory;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.cloud.stream.annotation.EnableBinding;
              import org.springframework.cloud.stream.annotation.StreamListener;
              import org.springframework.stereotype.Component;
              /**
               * 消息消费者
               */
              @Component
              @EnableBinding(MyProcessor.class)
              public class SourceMessageConsumer {
                  private Logger logger = LoggerFactory.getLogger(SourceMessageConsumer.class);
                  @Autowired
                  private SmsAndEmailMessageProducer smsAndEmailMessageProducer;
                  /**
                   * 接收原始消息,处理后并发送
                   *
                   * @param sourceMessage
                   */
                  @StreamListener(MyProcessor.SOURCE_MESSAGE)
                  public void receive(String sourceMessage) {
                      logger.info("原始消息接收成功,原始消息为:{}", sourceMessage);
                      // 发送消息 电话号码
                      smsAndEmailMessageProducer.sendSms(sourceMessage.split("\\|")[0]);
                      // 发送消息 邮箱地址
                      smsAndEmailMessageProducer.sendEmail(sourceMessage.split("\\|")[1]);
                  }
              }
              

              2、发送消息

              package com.example.producer;
              import com.example.channel.MyProcessor;
              import org.slf4j.Logger;
              import org.slf4j.LoggerFactory;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.cloud.stream.annotation.EnableBinding;
              import org.springframework.messaging.support.MessageBuilder;
              import org.springframework.stereotype.Component;
              /**
               * 消息生产者
               */
              @Component
              @EnableBinding(MyProcessor.class)
              public class SmsAndEmailMessageProducer {
                  private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageProducer.class);
                  @Autowired
                  private MyProcessor myProcessor;
                  /**
                   * 发送消息 电话号码
                   *
                   * @param smsMessage
                   */
                  public void sendSms(String smsMessage) {
                      logger.info("电话号码消息发送成功,消息为:{}", smsMessage);
                      myProcessor.smsOutput().send(MessageBuilder.withPayload(smsMessage).build());
                  }
                  /**
                   * 发送消息 邮箱地址
                   *
                   * @param emailMessage
                   */
                  public void sendEmail(String emailMessage) {
                      logger.info("邮箱地址消息发送成功,消息为:{}", emailMessage);
                      myProcessor.emailOutput().send(MessageBuilder.withPayload(emailMessage).build());
                  }
              }
              

              消息驱动微服务 A 控制台打印结果如下:

              电话号码为:10086,调用短信发送服务,发送短信...
              邮箱地址为:10086@email.com,调用邮件发送服务,发送邮件...
              

              消息驱动微服务 B 控制台打印结果如下:

              原始消息接收成功,原始消息为:10086|10086@email.com
              电话号码消息发送成功,消息为:10086
              邮箱地址消息发送成功,消息为:10086@email.com
              

              🔎7.消息分组

              1、配置分组

              消费者1

              server:
                port: 8002 # 端口
              spring:
                application:
                  name: stream-consumer # 应用名称
                rabbitmq:
                  host: 192.168.10.101  # 服务器 IP
                  port: 5672            # 服务器端口
                  username: guest       # 用户名
                  password: guest       # 密码
                  virtual-host: /       # 虚拟主机地址
                cloud:
                  stream:
                    bindings:
                      # 消息接收通道
                      # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
                      input:
                        destination: stream.message # 绑定的交换机名称
                        group: group-A
              

              消费者2

              server:
                port: 8003 # 端口
              spring:
                application:
                  name: stream-consumer # 应用名称
                rabbitmq:
                  host: 192.168.10.101  # 服务器 IP
                  port: 5672            # 服务器端口
                  username: guest       # 用户名
                  password: guest       # 密码
                  virtual-host: /       # 虚拟主机地址
                cloud:
                  stream:
                    bindings:
                      # 消息接收通道
                      # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
                      input:
                        destination: stream.message # 绑定的交换机名称
                        group: group-A
              

              运行单元测试发送消息,此时多个消息消费者只有其中一个可以消费。

              🔎8.消息分区

              1、生产者

              server:
                port: 8001 # 端口
              spring:
                application:
                  name: stream-producer # 应用名称
                rabbitmq:
                  host: 192.168.10.101  # 服务器 IP
                  port: 5672            # 服务器端口
                  username: guest       # 用户名
                  password: guest       # 密码
                  virtual-host: /       # 虚拟主机地址
                cloud:
                  stream:
                    bindings:
                      # 消息发送通道
                      # 与 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 注解的 value 相同
                      output:
                        destination: stream.message # 绑定的交换机名称
                        producer:
                          partition-key-expression: payload # 配置分区键的表达式规则
                          partition-count: 2 # 配置消息分区的数量
              
              package org.springframework.messaging;
              @FunctionalInterface
              public interface MessageChannel {
                  long INDEFINITE_TIMEOUT = -1L;
                  default boolean send(Message message) {
                      return this.send(message, -1L);
                  }
                  boolean send(Message var1, long var2);
              }
              
              package org.springframework.messaging.support;
              import java.io.Serializable;
              import java.util.Map;
              import org.springframework.lang.Nullable;
              import org.springframework.messaging.Message;
              import org.springframework.messaging.MessageHeaders;
              import org.springframework.util.Assert;
              import org.springframework.util.ObjectUtils;
              public class GenericMessage implements Message, Serializable {
                  private static final long serialVersionUID = 4268801052358035098L;
                  private final T payload;
                  private final MessageHeaders headers;
                  
                  ...
              }
              
              package com.example.producer;
              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.cloud.stream.annotation.EnableBinding;
              import org.springframework.cloud.stream.messaging.Source;
              import org.springframework.messaging.support.MessageBuilder;
              import org.springframework.stereotype.Component;
              /**
               * 消息生产者
               */
              @Component
              @EnableBinding(Source.class)
              public class MessageProducer {
                  @Autowired
                  private Source source;
                  /**
                   * 发送消息
                   *
                   * @param message
                   */
                  public void send(String message) {
                      source.output().send(MessageBuilder.withPayload(message).setHeader("xxx", 0).build());
                  }
              }
              

              2、消费者

              消费者1

              server:
                port: 8002 # 端口
              spring:
                application:
                  name: stream-consumer # 应用名称
                rabbitmq:
                  host: 192.168.10.101  # 服务器 IP
                  port: 5672            # 服务器端口
                  username: guest       # 用户名
                  password: guest       # 密码
                  virtual-host: /       # 虚拟主机地址
                cloud:
                  stream:
                    instance-count: 2 # 消费者总数
                    instance-index: 0 # 当前消费者的索引
                    bindings:
                      # 消息接收通道
                      # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
                      input:
                        destination: stream.message # 绑定的交换机名称
                        group: group-A
                        consumer:
                          partitioned: true # 开启分区支持
              

              消费者2

              server:
                port: 8003 # 端口
              spring:
                application:
                  name: stream-consumer # 应用名称
                rabbitmq:
                  host: 192.168.10.101  # 服务器 IP
                  port: 5672            # 服务器端口
                  username: guest       # 用户名
                  password: guest       # 密码
                  virtual-host: /       # 虚拟主机地址
                cloud:
                  stream:
                    instance-count: 2 # 消费者总数
                    instance-index: 1 # 当前消费者的索引
                    bindings:
                      # 消息接收通道
                      # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
                      input:
                        destination: stream.message # 绑定的交换机名称
                        group: group-A
                        consumer:
                          partitioned: true # 开启分区支持
              

              运行单元测试发送消息,此时多个消息消费者只有其中一个可以消费所有消息。


              🚀感谢:给读者的一封信

              亲爱的读者,

              我在这篇文章中投入了大量的心血和时间,希望为您提供有价值的内容。这篇文章包含了深入的研究和个人经验,我相信这些信息对您非常有帮助。

              如果您觉得这篇文章对您有所帮助,我诚恳地请求您考虑赞赏1元钱的支持。这个金额不会对您的财务状况造成负担,但它会对我继续创作高质量的内容产生积极的影响。

              我之所以写这篇文章,是因为我热爱分享有用的知识和见解。您的支持将帮助我继续这个使命,也鼓励我花更多的时间和精力创作更多有价值的内容。

              如果您愿意支持我的创作,请扫描下面二维码,您的支持将不胜感激。同时,如果您有任何反馈或建议,也欢迎与我分享。

              【愚公系列】2023年11月 Java教学课程 196-SpringCloud(Spring Cloud Stream消息驱动),在这里插入图片描述,第6张

              再次感谢您的阅读和支持!

              最诚挚的问候, “愚公搬代码”