『RabbitMQ』入门指南(安装,配置,应用)
作者:mmseoamin日期:2023-12-11

前言

RabbitMQ 是在 AMQP(Advanced Message Queuing Protocol) 协议标准基础上完整的,可复用的企业消息系统。它遵循 Mozilla Public License 开源协议,采用 Erlang 实现的工业级的消息队列(MQ)服务器,建立在 Erlang OTP 平台上(因为采用 Erlang 开发,所以 RabbitMQ 稳定性和可靠性比较高)

其他主流 MQ 产品

  • ActiveMQ:Apache 出品,最流行的,能力强劲的开源消息总线,基于 JMS(Java Message Service)规范
  • RocketMQ:阿里低延迟、高并发、高可用、高可靠的分布式消息中间件,基于 JMS,目前由 Apache 基金会维护
  • Kafka:分布式,分区的,多副本的,多订阅者的消息发布订阅系统(分布式 MQ 系统),可以用于搜索日志,监控日志,访问日志等

    本文为 RabbitMQ 入门教程,主要将会讲解 RabbitMQ 安装配置(Windows),相关概念,及项目中具体应用

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第1张

    安装

    Erlang

    官网下载链接:Downloads - Erlang/OTP

    RabbitMQ 服务器必须首先安装 Erlang 运行环境,同时安装时需要注意 RabbityMQ 所依赖的 Erlang 版本,我们可以查看下方官方版本对应信息

    版本对应:RabbitMQ Erlang Version Requirements — RabbitMQ

    本次使用版本 Erlang OTP 25.3(点击跳转下载链接)

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第2张

    双击执行 exe 安装程序,除了安装路径其他都按照默认即可

    然后配置环境变量

    ERLANG_HOME = D:\Erlang\Erlang\Erlang OTP
    

    并且添加 /bin 目录到 Path 环境变量中,即添加 %ERLANG_HOME%\bin 到 Path 中

    安装配置之后,打开 CMD,输入 erl 然后回车键,会弹出版本信息,表示 Erlang 安装成功

    RabbitMQ

    官方下载页面:RabbitMQ Changelog — RabbitMQ

    下载链接: RabbitMQ 3.12.0

    安装 exe 文件,执行安装包,同样除了安装路径外其他保持默认

    配置环境变量

    RABBITMQ_SERVER = D:\RabbitMQ\RabbitMQ\rabbitmq_server-3.12.0
    

    然后添加 %RABBITMQ_SERVER%\sbin 到 Path 环境变量中

    查看所有插件

    rabbitmq-plugins list
    

    注:如果出现问题请参考最后一章 彻底卸载

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第3张

    之后我们需要安装 rabbitmq_management 插件,可以使用可视化的方式查看 RabbitMQ 服务器实例的状态,以及操控 RabbitMQ 服务器

    # 安装插件
    rabbitmq-plugins enable rabbitmq_management
    

    访问管理界面: http://localhost:15672/ (账号密码:guest / guest)

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第4张

    前期安装配置完毕,下面可以配合官方入门文档学习

    官方文档:RabbitMQ Tutorials — RabbitMQ

    消息队列

    定义

    消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。

    “消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第5张

    作用

    解耦。如图所示。假设有系统 B、C、D 都需要系统 A 的数据,于是系统 A 调用三个方法发送数据到 B、C、D。这时,系统 D 不需要了,那就需要在系统 A 把相关的代码删掉。假设这时有个新的系统 E 需要数据,这时系统 A 又要增加调用系统 E 的代码。为了降低这种强耦合,就可以使用 MQ,系统 A 只需要把数据发送到 MQ,其他系统如果需要数据,则从 MQ 中获取即可

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第6张

    异步。如图所示。一个客户端请求发送进来,系统 A 会调用系统 B、C、D 三个系统,同步请求的话,响应时间就是系统 A、B、C、D 的总和,也就是 800ms。如果使用 MQ,系统 A 发送数据到 MQ,然后就可以返回响应给客户端,不需要再等待系统 B、C、D 的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用 MQ

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第7张

    削峰。如图所示。这其实是 MQ 一个很重要的应用。假设系统 A 在某一段时间请求数暴增,有 5000 个请求发送过来,系统 A 这时就会发送 5000 条 SQL 进入 MySQL 进行执行,MySQL 对于如此庞大的请求当然处理不过来,MySQL 就会崩溃,导致系统瘫痪。如果使用 MQ,系统 A 不再是直接发送 SQL 到数据库,而是把数据发送到 MQ,MQ 短时间积压数据是可以接受的,然后由消费者每次拉取 2000 条进行处理,防止在请求峰值时期大量的请求直接发送到 MySQL 导致系统崩溃

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第8张

    特点

    可靠性:通过支持消息持久化,支持事务,支持消费和传输的 ack 等来确保可靠性

    路由机制:支持主流的订阅消费模式,如广播,订阅,headers 匹配等

    扩展性:多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点

    高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队仍然可用

    多种协议:RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP,MQTT 等多种消息中间件协议

    多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript 等

    管理界面:RabbitMQ 提供了易用的用户界面,使得用户可以监控和管理消息、集群中的节点等

    插件机制:RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件

    应用

    本章将会集成 rabbitmq 到 SpringBoot 中,并使用 rabbitmq-provider (生产者)和 rabbitmq-consumer(消费者) 两个项目进行具体讲解, 也可以在父项目中创建这两个模块(本文采用父子模块方式)

    所有代码示例已经上传到 GitHub 仓库

    仓库地址:ReturnTmp/rabbitmq-demo: rabbitmq 实例代码 (github.com)

    生产者

    配置

    创建子模块 rabbitmq-provider

    依赖配置(也可以 IDEA 初始化模块直接勾选)

            
            
                org.springframework.boot
                spring-boot-starter-amqp
            
            
                org.springframework.boot
                spring-boot-starter-web
            
    

    application.yml

    server:  
      port: 8021  
    spring:  
      application:  
        name: rabbitmq-provider  
      rabbitmq:  
        host: 127.0.0.1  
        port: 5672  
        username: root  
        password: 111111  
        virtual-host: RootHost
    

    其中虚拟 host 配置项不是必须的,需要自行创建 vhost,如果未自行创建,默认为 virtual-host: /

    注:vhost 可以理解为虚拟 broker,即 mini-RabbitMQ server,其内部均含有独立的 queue、bind、exchange 等,最重要的是拥有独立的权限系统,可以做到 vhost 范围内的用户控制。当然,从 RabbitMQ 全局角度,vhost 可以作为不同权限隔离的手段

    可以按照如下步骤创建 vhost

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第9张

    然后创建用户(管理员)

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第10张

    然后我们需要为用户分配权限,指定使用我们刚刚创建的 vhost

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第11张

    代码

    创建直连交换机配置类

    注:RabbitMQ 共有四种交换机,分别为:直连交换机,扇形交换机,主题交换机,首部交换机。这里使用直连交换机演示,其他读者可以自行尝试

    @Configuration
    public class DirectRabbitConfig {
        //队列 起名:TestDirectQueue
        @Bean
        public Queue TestDirectQueue() {
            // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
            // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
            // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
            //   return new Queue("TestDirectQueue",true,true,false);
            //一般设置一下队列的持久化就好,其余两个就是默认false
            return new Queue("TestDirectQueue", true);
        }
        //Direct交换机 起名:TestDirectExchange
        @Bean
        DirectExchange TestDirectExchange() {
            //  return new DirectExchange("TestDirectExchange",true,true);
            return new DirectExchange("TestDirectExchange", true, false);
        }
        //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
        @Bean
        Binding bindingDirect() {
            return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
        }
        @Bean
        DirectExchange lonelyDirectExchange() {
            return new DirectExchange("lonelyDirectExchange");
        }
    }
    

    然后写简单的接口进行消息推送(可以视情况写为定时任务)

    @RestController
    public class SendMessageController {
        @Autowired
        RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法
        @GetMapping("/sendDirectMessage")
        public String sendDirectMessage() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "test message, hello!";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map map = new HashMap<>();
            map.put("messageId", messageId);
            map.put("messageData", messageData);
            map.put("createTime", createTime);
            //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
            rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
            return "ok";
        }
    }
    

    启动项目,调用接口: http://localhost:8021/sendDirectMessage

    查看 RabbitMQ 管理页面查看是否推送成功

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第12张

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第13张

    消费者

    配置

    创建子模块 rabbitmq-consumer

    依赖配置

            
            
                org.springframework.boot
                spring-boot-starter-amqp
            
            
                org.springframework.boot
                spring-boot-starter-web
            
    

    application.yml

    server:  
      port: 8022  
    spring:  
      application:  
        name: rabbitmq-consumer  
      rabbitmq:  
        host: 127.0.0.1  
        port: 5672  
        username: root  
        password: 111111  
        virtual-host: RootHost
    
    代码

    创建消息接收监听类

    @Component
    @RabbitListener(queues = "TestDirectQueue")
    public class DirectReceiver {
        @RabbitHandler
        public void process(Map testMessage) {
            System.out.println("DirectReceiver receive message: " + testMessage.toString());
        }
    }
    

    之后启动项目,查看消费者接收情况

    『RabbitMQ』入门指南(安装,配置,应用),image.png,第14张

    序列化

    发送接收消息可能出现 Failed to convert message 问题,可以通过使用 JSON 序列化传输信息方式解决

    生产者
    @Configuration
    public class RabbitMQConfig implements InitializingBean {
        /**
         * 自动注入RabbitTemplate模板
         */
        @Resource
        private RabbitTemplate rabbitTemplate;
        /**
         * 发送消息JSON序列化
         */
        @Override
        public void afterPropertiesSet() {
            //使用JSON序列化
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        }
    }
    
    消费者
    @Configuration
    public class RabbitMQConfig {
        @Bean
        public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
            return new Jackson2JsonMessageConverter(objectMapper);
        }
    }
    

    彻底卸载

    我们安装中可能出现各种问题,一般情况下是 RabbitMQ 和 Erlang 版本不对应,需要完全卸载 RabbitMQ 和 Erlang,可以按照如下步骤卸载

    注:博主首次安装使用的是 Erlang 20.3 Rabbit 3.7.15 ,之后似乎小版本不对应,出现问题,需要重新卸载安装

    (1)打开 Windows 控制面板,双击“程序和功能”。

    (2)在当前安装的程序列表中,右键单击 RabbitMQ Server,然后单击“卸载”。

    (3)在当前安装的程序列表中,右键单击“Erlang OTP”,然后单击“卸载”。

    (4)打开 Windows 任务管理器。

    (5)在任务管理器中,查找进程 epmd.exe。 如果此进程仍在运行,请右键单击该进程,然后单击“结束进程”。

    (6)删除 RabbitMQ 和 Erlang 的所有安装目录。

    (7)删除文件 C:\Windows\System32\config\systemprofile.erlang.cookie(如果存在)。

    (8)转到用户文件夹:C:\Users\[username],然后删除文件.erlang.cookie。

    (9)同样在 User 文件夹中,转到 AppData \ Roaming \ RabbitMQ。删除 RabbitMQ 文件夹。

    (10)删除注册表 HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv 的子项。

    (11)打开运行 cmd->sc delete RabbitMQ。

    (12)打开运行->regedit 找到 RabbitMQ 节点,删掉即可(如果存在)

    参考链接

    • Windows 下安装 RabbitMQ 服务器及基本配置 - 蓝之风 - 博客园 (cnblogs.com)
    • RabbitMQ Windows 安装、配置、使用 - 小白教程-阿里云开发者社区 (aliyun.com)
    • Windows 如何完全卸载 RabbitMQ 和 Erlang 删除注册表
    • windows 下 Erlang 与 RabbitMQ 重新安装时,由于卸载不干净导致各类错误
    • 超详细的 RabbitMQ 入门,看这篇就够了!-阿里云开发者社区 (aliyun.com)
    • RabbitMQ 整合 Spring Boot,实现 Hello World
    • Springboot 整合 RabbitMq ,用心看完这一篇就够了
    • RabbitMq 核心知识点小结 - 知乎 (zhihu.com)
    • RabbitMQ消费消息坑:failed to convert serialized Message content - jiuchengi

      本文由博客一文多发平台 OpenWrite 发布!