实战:Spring Cloud Stream集成兼容多消息中间件kafka、rabbitmq
作者:mmseoamin日期:2023-12-14

文章目录

    • 前言
    • 实战要点
    • 技术积累
      • Spring Cloud Stream简介
      • 集成kafka要点
      • 集成rabbitmq要点
      • 实战演示
        • Maven依赖版本号选择
        • Spring及MQ主要配置
        • 基础信道
        • 绑定信道消息发送
        • 集成兼容多mq演示
          • Rabbitmq演示
          • Kafka演示
          • 写在最后

            前言

            前面的博文我们介绍并实战演示了Spring Cloud Stream整合rabbitmq,其中主要介绍了如何使用和配置完成消息中间件的集成。但是,在实际的生产环境中可能会用到多个消息中间件,又或者是由于业务改变需要更换消息中间件,在这些情况下我们的Spring Cloud Stream框架可以完全兼容多个消息中间件和多种消息中间件的替换。今天,我们就在一个项目中用Spring Cloud Stream 集成两个消息中间件kafka和rabbitmq。

            实战要点

            1、完美集成并兼容kafka和rabbitmq

            2、增加消费组概念,直接保证消息唯一消费

            3、增加重试机制,重试条件满足后自动加入死信

            4、增加死信消费者,可以直接移植生产

            5、消费者手动ack、offset

            6、rabbitmq、kafka配置,保证消息不丢失

            技术积累

            Spring Cloud Stream简介

            Spring Cloud Stream是用于构建微服务具有消息驱动能力的框架,应用程序通过inputs、outputs通道与binder进行交互,binder与消息中间件进行通信。

            binder的作用是将消息中间件进行粘合,相当于对第三方中间件进行封装整合,让开发人员不用关心底层消息中间件如何运行。

            实战:Spring Cloud Stream集成兼容多消息中间件kafka、rabbitmq,在这里插入图片描述,第1张

            inputs是消息输入通道,类似于消息中间件的consumer消费者;outputs是消息输出通道,类似于消息中间件的producer生产者。应用程序收发消息不再直接调用消息中间件的接口或者逻辑代码,直接使用Spring Cloud Stream 的OUTPUT与INPUT通道进行处理。

            可以通过binder绑定选用各种消息中间件,用binding进行中间件的相关参数配置,让应用程序达到灵活配置和切换消息中间件的目的。

            集成kafka要点

            1、修改server.properties文件,将#listeners=PLAINTEXT://:9092这一句注释放开,改为listeners=PLAINTEXT://kafka服务器ip:9092

            如果此处不改SpringBoot在启动时会报错:

            Error connecting to node devops-01:9092 (id: 0 rack: null)

            2、kafka 2.8版本开始自带zk,建议使用2.8版本以上的版本不用安装zk

            3、spring-boot-starter-paren与spring-cloud-starter-stream-kafk版本号一定要对应上,特别是springboot2之后的版本。如果没有特殊要求,需严格按照本文的版本号进行配置和实战

            4、kafka本身、生产者、消费者保证消息不丢失,注意必须使用kafka HA配合修改配置

            集成rabbitmq要点

            1、rabbitmq比kafka的限制条件就少很多,基本上不用考虑spring版本号兼容

            2、rabbimq本身、生产者、消费者保证消息不丢失,注意必须使用rabbitmq HA

            实战演示

            本次实战直接采用从0到1的策略进行演示,适合小白直接入手,可直接接入生产

            本次实战MQ组件全部采用单机进行测试,生产环境请更换为HA

            本次实战提供:

            1、Kafka、Rabbitmq消息中间件信道注册

            2、Kafka、Rabbitm消息中间件消息发送、接收消息监听、死信消息监听

            Maven依赖版本号选择

            
                org.springframework.boot
                spring-boot-starter-parent
                2.3.12.RELEASE
                
            
            
                1.8
                Hoxton.SR10
            
            
                
                    org.springframework.cloud
                    spring-cloud-starter-stream-rabbit
                
                
                    org.springframework.cloud
                    spring-cloud-starter-stream-kafka
                    3.0.3.RELEASE
                
            
            
                
                    
                        org.springframework.cloud
                        spring-cloud-dependencies
                        ${spring-cloud.version}
                        pom
                        import
                    
                
            
            

            Spring及MQ主要配置

            server:
              port: 9999
            spring:
              rabbitmq:
                host: 10.10.22.187
                port: 5672
                username: admin
                password: admin
                virtual-host: /
              kafka:
                bootstrap-servers: 10.10.22.174:9092
              cloud:
                stream:
                  default-binder: myRabbit #默认绑定的mq
                  binders: #stream框架粘接的mq
                    myRabbit: #自定义个人mq名称
                      type: rabbit
                      environment:
                        spring: ${spring.rabbitmq}
                    myKafka:
                      type: kafka
                      environment:
                        spring:
                          cloud:
                            stream:
                              kafka: ${spring.cloud.stream.kafka.binder}
                  bindings: #stream绑定信道
                    output_channel: #自定义发送信道名称
                      destination: assExchange #目的地 交换机/主题
                      content-type: application/json
                      binder: myRabbit #粘接到的mq
                    input_channel: #自定义接收信道
                      destination: assExchange #目的地 交换机/主题
                      content-type: application/json
                      binder: myRabbit #粘接到的mq
                      group: assGroup
                      consumer:
                        maxAttempts: 3 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
                        backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
                        backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
                        backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
                    output_kafka_channel: #自定义发送信道名称
                      destination: assTopic #目的地 交换机/主题
                      content-type: text/plain
                      binder: myKafka #粘接到的mq
                      producer:
                        partition-count: 2 #分区数目
                    input_kafka_channel: #自定义接收信道
                      destination: assTopic #目的地 交换机/主题
                      content-type: text/plain
                      binder: myKafka #粘接到的mq
                      group: assGroup
                      consumer:
                        maxAttempts: 3 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
                        backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
                        backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
                        backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
                  rabbit: #stream mq配置
                    bindings:
                      out_channel:
                        producer:
                          delivery-mode: persistent   #消息持久化 non-persistent
                          useConfirmHeader: true #Future获取异常投递,与confirmAckChannel互斥
                      input_channel:
                        consumer:
                          concurrency: 1 #消费者数量
                          max-concurrency: 5 #最大消费者数量
                          durable-subscription: true  #持久化队列
                          recovery-interval: 3000  #3s 重连
                          acknowledge-mode: MANUAL  #手动
                          requeue-rejected: false #是否重新放入队列
                          auto-bind-dlq: true #开启死信队列
                          requeueRejected: true #异常放入死信
                  kafka:
                    binder:
                      brokers: ${spring.kafka.bootstrap-servers}
                      auto-add-partitions: true #自动分区
                      auto-create-topics: true #自动创建主题
                      replication-factor: 1 #两个副本
                      min-partition-count: 1 #最小分区
                    bindings:
                      out_kafka_channel:
                        producer:
                          # 无限制重发不产生消息丢失
                          retries: Integer.MAX_VALUE
                          #acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低
                          #acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中
                          #acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长
                          #可以设置的值为:all, -1, 0, 1
                          acks: all
                          min:
                            insync:
                              replicas: 1 #感知副本数
                      input_kafka_channel:
                        consumer:
                          concurrency: 1 #消费者数量
                          max-concurrency: 5 #最大消费者数量
                          recovery-interval: 3000  #3s 重连
                          auto-rebalance-enabled: true  #主题分区消费者组成员自动平衡
                          auto-commit-offset: false   #手动提交偏移量
                          enable-dlq: true  # 开启 dlq队列
                          dlq-name: assTopic.dlq
                          deserializationExceptionHandler: sendToDlq #异常加入死信
            

            基础信道

            /**
             * MqChannel
             * @author senfel
             * @version 1.0
             * @date 2023/6/2 15:46
             */
            public interface MqChannel {
                /**
                 * 消息目的地
                 * RabbitMQ中为交换机名称
                 * kafka topic
                 */
                String DESTINATION = "assExchange";
                String DESTINATIONBYGROUP = "assGroup";
                String DESTINATIONBYTOPIC = "assTopic";
                /**
                 * 输出信道
                 */
                String OUTPUT_CHANNEL = "output_channel";
                String OUTPUT_KAFKA_CHANNEL = "output_kafka_channel";
                /**
                 * 输入信道
                 */
                String INPUT_CHANNEL = "input_channel";
                String INPUT_KAFKA_CHANNEL = "input_kafka_channel";
                String INPUT_KAFKA_CHANNEL_ERROR = "assTopic.dlq";
                /**
                 * 死信队列
                 */
                String INPUT_CHANNEL_DLQ = "assExchange.assGroup.dlq";
                @Output(MqChannel.OUTPUT_CHANNEL)
                MessageChannel output();
                @Output(MqChannel.OUTPUT_KAFKA_CHANNEL)
                MessageChannel outputByKafka();
                @Input(MqChannel.INPUT_CHANNEL)
                SubscribableChannel input();
                @Input(MqChannel.INPUT_KAFKA_CHANNEL)
                SubscribableChannel inputByKafka();
                @Input(MqChannel.INPUT_KAFKA_CHANNEL_ERROR)
                SubscribableChannel inputByKafkaError();
            }
            

            绑定信道消息发送

            提供绑定信道,增加rabbitmq、kafka发消息逻辑

            1、启动类增加绑定mq注解@EnableBinding(MqChannel.class)

            @SpringBootApplication
            @EnableBinding(MqChannel.class)
            public class TestDemoApplication {
                public static void main(String[] args) {
                    SpringApplication.run(TestDemoApplication.class, args);
                }
            }
            

            2、增加发送消息接口

            /**
             * TestMQService
             * @author senfel
             * @version 1.0
             * @date 2023/6/2 15:47
             */
            public interface TestMQService {
                /**
                 * rabbitmq发送消息
                 */
                void send(String str);
                /**
                 * kafka发送消息
                 */
                void sendByKafka(String str);
            }
            

            3、实现发送消息接口

            /**
             * TestMQServiceImpl
             * @author senfel
             * @version 1.0
             * @date 2023/6/2 15:49
             */
            @Service
            @Slf4j
            public class TestMQServiceImpl implements TestMQService {
                @Resource
                private MqChannel mqChannel;
                @Override
                public void send(String str) {
                    mqChannel.output().send(MessageBuilder.withPayload("rabbitmq测试:"+str).build());
                }
                @Override
                public void sendByKafka(String str) {
                    mqChannel.outputByKafka().send(MessageBuilder.withPayload("kafka测试:"+str).build());
                }
            }
            

            4、提供接口层

            /**
             * @author senfel
             * @version 1.0
             * @date 2023/6/2 17:27
             */
            @RestController
            public class TestController{
                @Resource
                private TestMQService testMQService;
                /**
                 * testRabbitmq
                 * @param str
                 * @author senfel
                 * @date 2023/6/8 11:27 
                 * @return java.lang.String
                 */
                @GetMapping("/test")
                public String testMq(String str){
                    testMQService.send(str);
                    return str;
                }
                /**
                 * testKafka
                 * @param str
                 * @author senfel
                 * @date 2023/6/8 11:27 
                 * @return java.lang.String
                 */
                @GetMapping("/testKafka")
                public String testKafka(String str){
                    testMQService.sendByKafka(str);
                    return str;
                }
            }
            

            集成兼容多mq演示

            Rabbitmq演示

            1、TestMQServiceImpl增加mq消息监听和私信监听

             /**
             * 接收消息监听
             * @param message 消息体
             * @param channel 信道
             * @param tag 标签
             * @author senfel
             * @date 2023/6/5 9:25
             * @return void
             */
            @StreamListener(MqChannel.INPUT_CHANNEL)
            public void process(String message,
                                @Header(AmqpHeaders.CHANNEL) Channel channel,
                                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
                log.info("message : "+message);
                if(message.contains("9")){
                    // 参数1为消息的tag  参数2为是否多条处理 参数3为是否重发
                    //channel.basicNack(tag,false,false);
                    System.err.println("--------------rabbitmq消费者消费异常--------------------------------------");
                    System.err.println(message);
                    throw new RuntimeException("消费异常");
                }else{
                    System.err.println("--------------rabbitmq消费者--------------------------------------");
                    System.err.println(message);
                    channel.basicAck(tag,false);
                }
            }
            /**
             * 死信监听
             * @param message 消息体
             * @param channel 信道
             * @param tag 标签
             * @author senfel
             * @date 2023/6/5 14:30
             * @return void
             */
            @RabbitListener(
                    bindings = @QueueBinding(
                            value = @Queue(MqChannel.INPUT_CHANNEL_DLQ)
                            , exchange = @Exchange(MqChannel.DESTINATION)
                    ),
                    concurrency = "1-5"
            )
            public void processByDlq(String message,
                                     @Header(AmqpHeaders.CHANNEL) Channel channel,
                                     @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
                log.info("message : "+message);
                System.err.println("---------------rabbitmq死信消费者------------------------------------");
                System.err.println(message);
            }
            

            2、测试正常消息投递

            实战:Spring Cloud Stream集成兼容多消息中间件kafka、rabbitmq,在这里插入图片描述,第2张

            --------------rabbitmq消费者--------------------------------------

            rabbitmq测试:777777777777777

            3、测试异常消息投递,投递规则3次消费失败直接进入死信

            实战:Spring Cloud Stream集成兼容多消息中间件kafka、rabbitmq,在这里插入图片描述,第3张

            --------------rabbitmq消费者消费异常--------------------------------------

            rabbitmq测试:7777777777777779

            --------------rabbitmq消费者消费异常--------------------------------------

            rabbitmq测试:7777777777777779

            --------------rabbitmq消费者消费异常--------------------------------------

            rabbitmq测试:7777777777777779

            ---------------rabbitmq死信消费者------------------------------------

            rabbitmq测试:7777777777777779

            Kafka演示

            1、TestMQServiceImpl增加mq消息监听和私信监听

               /**
                 * kafka消费者
                 * @param message 消息体
                 * @param acknowledgment ack
                 * @param receivedTopic topic
                 * @param groupId 消费者group
                 * @author senfel
                 * @date 2023/6/7 15:59
                 * @return void
                 */
                @StreamListener(MqChannel.INPUT_KAFKA_CHANNEL)
                public void processByKafka(String message,
                                           @Header(value = KafkaHeaders.ACKNOWLEDGMENT,required = false) Acknowledgment acknowledgment,
                                           @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic,
                                           @Header(value = KafkaHeaders.GROUP_ID,required = false) String groupId,
                                           @Header(value = KafkaHeaders.PARTITION_ID,required = false) String partitionId) throws Exception {
                    System.err.println("-------进入kafka消费者---------------");
                    System.err.println(message);
                    System.err.println(receivedTopic);
                    if(message.contains("9")){
                        log.error("kafka消费异常:{}",message);
                        System.err.println("kafka1消费异常"+message);
                        throw new RuntimeException("kafka消费异常");
                    }
                    System.err.println("kafka接受的数据为"+message);
                    acknowledgment.acknowledge();
                }
                /**
                 * kafka死信消费
                 * @param message 消息体
                 * @param receivedTopic  topic
                 * @author senfel
                 * @date 2023/6/7 15:58
                 * @return void
                 */
               @KafkaListener(topics = {MqChannel.INPUT_KAFKA_CHANNEL_ERROR},
                                groupId = MqChannel.DESTINATIONBYGROUP)
                public void processByKafkaError(String message,
                                                @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) throws Exception {
                    System.err.println("-------进入死信消费者---------------");
                    System.err.println(message);
                    System.err.println(receivedTopic);
                    System.err.println("kafka死信接受的数据为"+message);
                    System.err.println(message);
                }
            

            2、测试正常消息投递

            实战:Spring Cloud Stream集成兼容多消息中间件kafka、rabbitmq,在这里插入图片描述,第4张

            -------进入kafka消费者---------------

            kafka测试:7777777777777777

            assTopic

            kafka接受的数据为kafka测试:7777777777777777

            3、测试异常消息投递,投递规则3次消费失败直接进入死信

            实战:Spring Cloud Stream集成兼容多消息中间件kafka、rabbitmq,在这里插入图片描述,第5张

            -------进入kafka消费者---------------

            kafka测试:7777777777777779

            assTopic

            kafka1消费异常kafka测试:7777777777777779

            -------进入kafka消费者---------------

            kafka测试:7777777777777779

            assTopic

            kafka1消费异常kafka测试:7777777777777779

            -------进入kafka消费者---------------

            kafka测试:7777777777777779

            assTopic

            kafka1消费异常kafka测试:7777777777777779

            -------进入死信消费者---------------

            kafka测试:7777777777777779

            assTopic.dlq

            kafka死信接受的数据为kafka测试:7777777777777779

            kafka测试:7777777777777779

            写在最后

            Spring Cloud Stream集成多消息中间件kafka、rabbitmq较为简单,直接省去了原生中间的的操作与处理,开发人员可以直接任意切换和混用多种消息中间件,大大增加架构的可用性与可移植性。本实战案例提供重试、私信、手动ack、消费者分组和负载等高可用方案,直接可接入生产使用。

            ⭐️路漫漫其修远兮,吾将上下而求索 🔍