Spring Boot 整合RabbitMQ
作者:mmseoamin日期:2023-12-11

系列文章目录

第一章 Java线程池技术应用

第二章 CountDownLatch和Semaphone的应用

第三章 Spring Cloud 简介

第四章 Spring Cloud Netflix 之 Eureka

第五章 Spring Cloud Netflix 之 Ribbon

第六章 Spring Cloud 之 OpenFeign

第七章 Spring Cloud 之 GateWay

第八章 Spring Cloud Netflix 之 Hystrix

第九章 代码管理gitlab 使用

第十章 SpringCloud Alibaba 之 Nacos discovery

第十一章 SpringCloud Alibaba 之 Nacos Config

第十二章 Spring Cloud Alibaba 之 Sentinel

第十三章 JWT

第十四章 RabbitMQ应用

在这里插入图片描述


文章目录

  • 系列文章目录
    • @[TOC](文章目录)
    • 前言
    • 1、RabbitMQ概念概念
      • 1.1、生产者和消费者
      • 1.2、队列
      • 1.3、交换机、路由键、绑定
        • 1.3.1、交换机类型
        • 2、RabbitMQ运转流程
          • 2.1、生产者发送消息流程
          • 2.2、消费者接收消息的过程
          • 2.3、AMQP协议
          • 3、RabbitMQ windows安装
            • 3.1、下载
            • 3.2、安装
            • 4、Spring Boot 整合RabbitMQ
              • 4.1、在user-service添加依赖
              • 4.2、配置文件添加
              • 4.3、增加RabbitMQ配置类
              • 4.4、新增消费监听类
              • 4.5、消息生产端
              • 总结

                前言

                一般MQ用于系统解耦、削峰使用,常见于微服务、业务活动等场景。 MQ(消息队列)在微服务、业务活动等场景中的应用主要表现为系统解耦和削峰。

                系统解耦

                场景描述:在微服务架构中,服务与服务之间需要通信。如果采用直接调用方式,服务间会存在强依赖关系,一个服务的改动可能引发连锁反应。

                MQ作用:服务间可以通过消息队列进行通信,一个服务将消息放入队列,另一个服务从队列中取出消息进行处理。这种方式下,服务间实现了解耦,降低了相互的依赖。

                削峰

                场景描述:在业务活动期间,由于用户请求量短时间内剧增,可能导致系统压力过大甚至崩溃。

                MQ作用:通过消息队列实现请求的缓冲。在高并发场景下,系统可以将请求放入消息队列,然后异步处理这些请求,从而平滑系统的处理负载,确保系统的稳定性。

                综上所述,MQ因其独特的队列属性和消息传递模式,在分布式、微服务架构中发挥着重要的作用,提高了系统的可用性和稳定性。

                1、RabbitMQ概念概念

                RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。

                在这里插入图片描述

                1.1、生产者和消费者

                • Producer:生产者,就是投递消息的一方。消息一般可以包含2个部分:消息体和标签(Label)。消息的标签用来描述这条消息,比如一个交换器的名称和一个路由键。
                • Consumer:消费者,就是接受消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)
                • Broker:消息中间件的服务节点。一个RabbitMQ Broker看做一台RabbitMQ服务器

                  在这里插入图片描述

                  1.2、队列

                  Queue:队列,是RabbitMQ的内部对象,用于存储消息

                  在这里插入图片描述

                  在这里插入图片描述

                  1.3、交换机、路由键、绑定

                  Exchange:交换器。生产者将消息发送到Exchange(交换器,通常也可以用大写的"X"来表示),有交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。

                  在这里插入图片描述

                  RoutingKey:路由键。生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。

                  Binding:绑定。RabbitMQ中通过绑定将交换器与队列联合起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。

                  在这里插入图片描述

                  1.3.1、交换机类型

                  • Direct Exchange:直连交换机,根据Routing Key(路由键)进行投递到不同队列。

                    在这里插入图片描述

                    在这里插入图片描述

                  • Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。

                    在这里插入图片描述

                  • Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,*表示一个词。

                    在这里插入图片描述

                  • Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。

                    在这里插入图片描述

                    自学参考:https://blog.csdn.net/qq_38550836/article/details/95358353

                    2、RabbitMQ运转流程

                    2.1、生产者发送消息流程

                    • 生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
                    • 生产者声明一个交换器,并设置相关属性,比如交换器类型、是否持久化等
                    • 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
                    • 生产者通过路由键将交换器和队列绑定起来
                    • 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
                    • 相应的交换器根据接收到的路由键查找相匹配的队列。
                    • 如果找到,则将从生产者发送过来的消息存入相应的队列。
                    • 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
                    • 关闭信道
                    • 关闭连接

                      2.2、消费者接收消息的过程

                      • 消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
                      • 消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
                      • 等待RabbitMQ Broker回应并投递相应队列中队列的消息,消费者接收消息。
                      • 消费者确认(ack)接收到的消息。
                      • RabbitMQ从队列中删除相应已经被确认的消息。
                      • 关闭信道
                      • 关闭连接

                        在这里插入图片描述无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。

                        2.3、AMQP协议

                        Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等

                        在这里插入图片描述

                        • Broker:接收和分发消息的应用,RabbitMQ 就是 Message Broker
                        • Virtual Host:虚拟 Broker,将多个单元隔离开
                        • Connection:publisher / consumer 和 broker 之间的 tcp 连接
                        • Channel:connection 内部建立的逻辑连接,通常每个线程创建单独的 channel
                        • Routing key:路由键,用来指示消息的路由转发,相当于快递的地址
                        • Exchange:交换机,相当于快递的分拨中心
                        • Queue:消息队列,消息最终被送到这里等待 consumer 取走
                        • Binding:exchange 和 queue 之间的虚拟连接,用于 message 的分发依据

                          3、RabbitMQ windows安装

                          3.1、下载

                          https://github.com/erlang/otp/releases/download/OTP-25.2/otp_win64_25.2.exe

                          https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.11.5/rabbitmq-server-3.11.5.exe

                          3.2、安装

                          配置环境变量

                          在这里插入图片描述

                          在这里插入图片描述

                          cd D:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.5\sbin

                          开启rabbitmq-plugins插件

                          rabbitmq-plugins enable rabbitmq_management

                          在这里插入图片描述

                          打开地址

                          http://127.0.0.1:15672/

                          在这里插入图片描述

                          输入用户名/密码:guest/guest

                          4、Spring Boot 整合RabbitMQ

                          4.1、在user-service添加依赖

                          
                              org.springframework.boot
                              spring-boot-starter-amqp
                          
                          

                          4.2、配置文件添加

                          spring:
                            rabbitmq:
                              host: 127.0.0.1
                              port: 5672
                              username: guest
                              password: guest
                          

                          4.3、增加RabbitMQ配置类

                          package com.xxxx.user.config;
                          import org.springframework.amqp.core.*;
                          import org.springframework.beans.factory.annotation.Qualifier;
                          import org.springframework.context.annotation.Bean;
                          import org.springframework.context.annotation.Configuration;
                          @Configuration
                          public class RabbitMQConfig {
                              /******************direct**********************/
                              /**
                               * 创建direct队列
                               * @return
                               */
                              @Bean
                              public Queue directQueue(){
                                  return new Queue("directQueue");
                              }
                              /**
                               * 创建direct交换机
                               * @return
                               */
                              @Bean
                              public DirectExchange directExchange(){
                                  return new DirectExchange("directExchange");
                              }
                              /**
                               * 把队列和交换机绑定在一起
                               * @param queue
                               * @param directExchange
                               * @return
                               */
                              @Bean
                              public Binding bindingDirect(@Qualifier("directQueue") Queue queue, DirectExchange directExchange){
                                  return BindingBuilder.bind(queue).to(directExchange).with("routingKey");
                              }
                              /******************topic**********************/
                              @Bean
                              public Queue topicQuerue1(){
                                  return new Queue("topicQuerue1");
                              }
                              @Bean
                              public Queue topicQuerue2(){
                                  return new Queue("topicQuerue2");
                              }
                              @Bean
                              public TopicExchange topicExchange(){
                                  return new TopicExchange("topicExchange");
                              }
                              @Bean
                              public Binding bindingTopic1(@Qualifier("topicQuerue1") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){
                                  return BindingBuilder.bind(queue).to(topicExchange).with("topic.key1");
                              }
                              /**
                               * 通配符:* 表示一个词,# 表示零个或多个词
                               * @param queue
                               * @param topicExchange
                               * @return
                               */
                              @Bean
                              public Binding bindingTopic2(@Qualifier("topicQuerue2") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){
                                  return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");
                              }
                              /******************fanout**********************/
                              @Bean
                              public Queue fanoutQueue1(){
                                  return new Queue("fanoutQueue1");
                              }
                              @Bean
                              public Queue fanoutQueue2(){
                                  return new Queue("fanoutQueue2");
                              }
                              @Bean
                              public Queue fanoutQueue3(){
                                  return new Queue("fanoutQueue3");
                              }
                              @Bean
                              public FanoutExchange fanoutExchange(){
                                  return new FanoutExchange("fanoutExchange");
                              }
                              @Bean
                              public Binding bindingFanout1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
                                  return BindingBuilder.bind(queue).to(fanoutExchange);
                              }
                              @Bean
                              public Binding bindingFanout2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
                                  return BindingBuilder.bind(queue).to(fanoutExchange);
                              }
                              @Bean
                              public Binding bindingFanout3(@Qualifier("fanoutQueue3") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
                                  return BindingBuilder.bind(queue).to(fanoutExchange);
                              }
                          }
                          

                          4.4、新增消费监听类

                          package com.xxxx.user.consumer;
                          import com.xxxx.drp.common.entity.UserInfo;
                          import lombok.extern.slf4j.Slf4j;
                          import org.springframework.amqp.rabbit.annotation.RabbitHandler;
                          import org.springframework.amqp.rabbit.annotation.RabbitListener;
                          import org.springframework.stereotype.Component;
                          @Component
                          @Slf4j
                          @RabbitListener(queues = "directQueue")
                          public class DataDirectReceiver {
                              @RabbitHandler
                              public void process(String data){
                                  log.info("收到directQueue队列信息:" + data);
                              }
                              @RabbitHandler
                              public void process(UserInfo data){
                                  log.info("收到directQueue队列信息:" + data);
                              }
                          }
                          
                          package com.xxxx.user.consumer;
                          import com.xxxx.common.entity.UserInfo;
                          import lombok.extern.slf4j.Slf4j;
                          import org.springframework.amqp.rabbit.annotation.RabbitHandler;
                          import org.springframework.amqp.rabbit.annotation.RabbitListener;
                          import org.springframework.stereotype.Component;
                          @Component
                          @Slf4j
                          @RabbitListener(queues = {"topicQuerue1","topicQuerue2"})
                          public class DataFanoutReceiver {
                              @RabbitHandler
                              public void process(String data){
                                  log.info("收到topicQuerue队列信息:" + data);
                              }
                              @RabbitHandler
                              public void process(UserInfo data){
                                  log.info("收到topicQuerue队列信息:" + data);
                              }
                          }
                          
                          package com.xxxx.user.consumer;
                          import com.xxxx.common.entity.UserInfo;
                          import lombok.extern.slf4j.Slf4j;
                          import org.springframework.amqp.rabbit.annotation.RabbitHandler;
                          import org.springframework.amqp.rabbit.annotation.RabbitListener;
                          import org.springframework.stereotype.Component;
                          @Component
                          @Slf4j
                          @RabbitListener(queues = {"fanoutQueue1","fanoutQueue2","fanoutQueue3"})
                          public class DataTopicReceiver {
                              @RabbitHandler
                              public void process(String data){
                                  log.info("收到topicQuerue队列信息:" + data);
                              }
                              @RabbitHandler
                              public void process(UserInfo data){
                                  log.info("收到topicQuerue队列信息:" + data);
                              }
                          }
                          

                          4.5、消息生产端

                          package com.xxxx.user;
                          import com.xxxx.common.entity.UserInfo;
                          import org.junit.jupiter.api.Test;
                          import org.springframework.amqp.rabbit.core.RabbitTemplate;
                          import org.springframework.beans.factory.annotation.Autowired;
                          import org.springframework.boot.test.context.SpringBootTest;
                          @SpringBootTest
                          public class DataSender {
                              @Autowired
                              private RabbitTemplate rabbitTemplate;
                              @Test
                              public void sendDirect(){
                                  UserInfo userInfo = new UserInfo();
                                  userInfo.setUserAccount("tiger");
                                  userInfo.setPassword("12345");
                                  this.rabbitTemplate.convertAndSend("directExchange","routingKey",userInfo);
                              }
                              @Test
                              public void sendTopic(){
                                  this.rabbitTemplate.convertAndSend("topicExchange","topic.key2","Hello world topic");
                              }
                              @Test
                              public void sendFanout(){
                                  this.rabbitTemplate.convertAndSend("fanoutExchange","","Hello world topic");
                              }
                          }
                          

                          总结

                          MQ因其独特的队列属性和消息传递模式,在分布式、微服务架构中发挥着重要的作用,提高了系统的可用性和稳定性。人工智能AI编程知识库