MQ 全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信。
消息,两台计算机间传送的数据单位。可以非常简单,也可以更复杂。
队列,数据结构中概念。在队列中,数据先进先出,后进后出。
优点:
生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可,而不需要和其他系统有耦合,这显然也提高了系统的扩展性。
将用户的请求数据存储到消息队列之后就立即返回结果。随后,系统再对消息进行消费。
先将短时间内高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。
缺点:
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。
系统复杂度提高
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。
一致性问题
A 系统处理完业务,通过 MQ 给 B、C、D 三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败,则会造成数据处理的不一致。
应用解耦
在电商平台中,用户下订单需要调用订单系统,此时订单系统还需要调用库存系统、支付系统、物流系统完成业务。此时会产生两个问题:
如果在系统中引入 MQ,即订单系统将消息先发送到 MQ 中,MQ 再转发到其他系统,则会解决以下问题:
异步提速
如果订单系统同步访问每个系统,则用户下单等待时长为920;如果引入 MQ,则用户下单等待时长为25
削峰限流
假设我们的系统每秒只能承载 1000 请求,如果请求瞬间增多到每秒 5000,则会造成系统崩溃。此时引入 MQ 即可解决该问题
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“限流”。
AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受中间件产品、开发语言等条件的限制。类比 HTTP 协议。
JMS,即 Java Message Service,是 Java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。JMS API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。一种规范,和 JDBC、Jedis 担任的角色类似。
区别:
ActiveMQ | RabbitMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Apache | RabbitMQ | 阿里 | Apache |
开发语言 | Java | Erlang | Java | Scala&Java |
协议支持 | AMQP,REST,XMPP,STOMP | AMQP,XMPP,SMTP,STOMP | 自定义 | 自定义协议,社区封装了 HTTP 协议支持 |
客户端支持语言 | Java,C,C++,Python,PHP等 | 官方支持 Erlang,Java等,社区产出多种 API,几乎支持所有语言 | Java,C++ | 官方支持 Java,社区产出多种 API,如 PHP,Python等 |
单机吞吐量 | 万级 | 万级 | 十万级 | 十万级 |
消息延迟 | 毫秒级 | 微妙级 | 毫秒级 | 毫秒以内 |
高可用 | 主从架构 | 镜像集群模式 | 分布式架构 | 分布式架构 |
消息可靠性 | 较低概率丢失数据 | 不丢失数据 | 保证数据绝不丢失 | |
功能特性 | 老牌产品,成熟度高,文档较丰富 | 并发能力强,性能极其好,支持一些消息中间件的高级功能,延时低,社区活跃,管理界面较为丰富 | MQ性能比较完备,扩展性强,支持大量的消息中间件高级功能 | 只支持主要的 MQ 功能(接收与发送),主要应用于大数据领域 |
RabbitMQ 是由 Erlang 语言开发,基于 AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
消息的生产者。也是一个向交换机发布消息的客户端应用程序。
连接。生产者/消费者和 RabbitMQ 服务器之间建立的 TCP 连接。
信道。是 TCP 里面的虚拟连接。例如:Connection 相当于电缆,Channel 相当于独立光纤束,一条 TCP 连接中可以创建多条信道,增加连接效率。无论是发布消息、接收消息、订阅队列都是通过信道完成的。
消息队列服务器实体。即 RabbitMQ 服务器
虚拟主机。出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中。每个 Virtual Host 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换机、绑定和权限机制。当多个不同的用户使用同一个 RabbitMQ 服务器时,可以划分出多个虚拟主机。RabbitMQ 默认的虚拟主机路径是 /
交换机。用来接收生产者发送的消息,并根据分发规则,将这些消息分发给服务器中的队列中。不同的交换机有不同的分发规则。
rabbitMQ 交换机类型有4种
- direct(直连):它会把消息路由到那些 BindingKey RoutingKey完全匹配的队列中。
- fanout(扇形):它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
- topic(主题):将消息路由到 BindingKey RoutingKey 相匹配的队列中。
- headers(标题):交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中headers 属性进行匹配。
直连交换机(dirext exchange)为 RabbitMQ 默认的交换机。
消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。消息一直在队列里面,等待消费者链接到这个队列将其取走。
消息队列和交换机之间的虚拟连接,绑定中包含路由规则,绑定信息保存到交换机的路由表中,作为消息的分发依据。
消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。
拓展:RabbitMQ 为什么使用信道而不直接使用 TCP 连接通信?
TCP 连接的创建和销毁开销特别大。创建需要 3 次握手,销毁需要 4 次挥手。高峰时每秒成千上万条 TCP 连接的创建会造成资源巨大的浪费。而且操作系统每秒处理 TCP 连接数也是有限制的,会造成性能瓶颈。而如果一条线程使用一条信道,一条 TCP 链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。
RabbitMQ 是使用 Erlang 语言编写的,所以在安装 RabbitMQ 前需要先安装 Erlang 环境
#安装Erlang所需的依赖 [root@localhost opt]# yum install -y epel-release #添加存储条目 [root@localhost opt]# wget https://packages.erlangsolutions.com/erlang-solutions-1.0-1.noarch.rpm [root@localhost opt]# rpm -Uvh erlang-solutions-1.0-1.noarch.rpm #安装Erlang [root@localhost opt]# yum install -y erlang #查看Erlang是否安装成功 [root@localhost opt]# erl -version
1)为了外部能够正常访问 RabbitMQ 服务,先关闭防火墙
# 关闭运行的防火墙 [root@localhost opt]# systemctl stop firewalld.service # 禁止防火墙自启动 [root@localhost opt]# systemctl disable firewalld.service
2)RabbitMQ 是通过主机名进行访问的,必须给服务器添加主机名
# 修改文件 [root@localhost opt]# vim /etc/sysconfig/network # 添加如下内容(名字可自定义) NETWORKING=yes HOSTNAME=lyl # 修改文件 [root@localhost opt]# vim /etc/hosts # 添加如下内容:服务器ip 主机名 192.168.43.100 lyl
3)使用 Xftp 上传 RabbitMQ 压缩文件(此处我用的版本是 3.9.13)
4)安装 RabbitMQ
# 解压RabbitMQ [root@localhost opt]# tar -xvf rabbitmq-server-generic-unix-3.9.13.tar.xz # 重命名 [root@localhost opt]# mv rabbitmq_server-3.9.13 rabbitmq # 移动文件夹 [root@localhost opt]# mv rabbitmq /usr/local/
5)配置环境变量
# 编辑/etc/profile文件 [root@localhost opt]# vim /etc/profile #添加如下内容 export PATH=$PATH:/usr/local/rabbitmq/sbin # 运行文件,让修改内容生效 [root@localhost opt]# source /etc/profile
6)开启管控台插件
[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management
7)后台运行 RabbitMQ
#启动rabbitmq [root@localhost opt]# rabbitmq-server -detached #停止rabbitmq [root@localhost opt]# rabbitmqctl stop
8)通过管控台访问 RabbitMQ,路径:ip地址:15672 ,用户名:guest,密码:guest
RabbitMQ 默认端口是 5672,管控台的默认端口为 15672
9)此时会提示 guest 账户只允许本地使用,我们可以配置允许使用 guest 远程访问
# 创建配置文件夹 [root@localhost opt]# mkdir -p /usr/local/rabbitmq/etc/rabbitmq # 创建配置文件 [root@localhost opt]# vim /usr/local/rabbitmq/etc/rabbitmq/rabbitmq.conf # 添加如下内容 loopback_users=none # 重启RabbitMQ(停止服务、重新加载配置文件、开启服务) [root@localhost opt]# rabbitmqctl stop_app [root@localhost opt]# rabbitmqctl reset [root@localhost opt]# rabbitmqctl start_app
guest 账户默认只允许本地使用,我们也可以创建新账户远程访问 RabbitMQ(可以通过命令方式,也可以通过管控台可视化方式)
# 创建账户(后面为用户名和密码) [root@localhost opt]# rabbitmqctl add_user lyl lyl #给用户授予管理员角色 [root@localhost opt]# rabbitmqctl set_user_tags lyl administrator #给用户授权("/"表示虚拟主机、"lyl"表示用户名、".*"".*"".*"表示完整权限) [root@localhost opt]# rabbitmqctl set_permissions -p "/" lyl ".*" ".*" ".*" #查看所有用户 [root@localhost opt]# rabbitmqctl list_users Listing users ... user tags lyl [administrator] guest [administrator]
以下即为管控台界面
可以在 Admin(系统管理中)添加用户~
RabbitMQ 共有六种工作模式:
特点:
1.1 引入依赖
com.rabbitmq amqp-client 5.14.0
1.2 生产者代码
/** * Description: 简单模式生产者 */ public class Producer { //队列名称 private static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.43.100"); factory.setPort(5672); factory.setUsername("lyl"); factory.setPassword("lyl"); factory.setVirtualHost("/"); // 2.创建连接 Connection connection = factory.newConnection(); // 3.建立信道 Channel channel = connection.createChannel(); // 4.创建队列,如果已经存在,则直接使用 /** * 方法参数: * 参数1: 队列名称 * 参数2: 是否持久化。true表示队列会保存磁盘,MQ重启后队列还在 * 参数3: 是否私有化。true表示只有第一次拥有它的消费者才能访问,false表示所有消费者都能访问 * 参数4: 是否自动删除。true表示不再使用队列时自动删除队列 * 参数5: 队列其他参数。如:x-message-ttl,x-expires等 */ channel.queueDeclare(QUEUE_NAME, false, false ,false, null); // 5.发送消息 String message = "hello rabbitmq!"; /** * 方法参数: * 参数1: 交换机名,""表示默认交换机 * 参数2: 路由key,简单模式为队列名 * 参数3: 消息的其他属性,如路由头等 * 参数4: 消息体(字节数组格式) */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 6.关闭信道和连接 channel.close(); connection.close(); System.out.println("生产消息成功~~~"); } }
运行生产者后,我们可以看到在 RabbitMQ 中创建了队列,队列中已经有了消息,具体详情如下
1.3 消费者代码
消费者需要一直监听队列,代码如下:
/** * Description: 简单模式消费者 */ public class Consumer { //队列名称 private static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.43.100"); factory.setPort(5672); factory.setUsername("lyl"); factory.setPassword("lyl"); factory.setVirtualHost("/"); // 2.创建连接 Connection connection = factory.newConnection(); // 3.建立信道 Channel channel = connection.createChannel(); // 4.监听队列 /** * 方法参数: * 参数1: 监听的队列名 * 参数2: 是否自动签收。如果为false,则需要手动确认消息已收到,否则MQ会一直发送消息 * 参数3: Consumer的实现类,重写该类方法表示接收到消息后如何消费 */ channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("接收到的消息为: " + message); } }); } }
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用 direct 交换机,应用于处理消息较多的情况。特点如下:
2.1 生产者代码
由于连接 RabbitMQ 的操作都一样,所以这里我们将代码抽取出来进行封装,封装如下:(注意:没有关闭资源连接)
/** * Description: RabbitMQ工具类 */ public class RabbitMqUtil { public static Channel getChannel() throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.43.100"); factory.setPort(5672); factory.setUsername("lyl"); factory.setPassword("lyl"); factory.setVirtualHost("/"); // 2.创建连接 Connection connection = factory.newConnection(); // 3.建立信道 return connection.createChannel(); } }
生产者代码如下:
/** * Description: 工作队列模式生产者 */ public class Producer { //队列名称 private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 1.通过工具类建立信道 Channel channel = RabbitMqUtil.getChannel(); // 2.创建队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 3.发送批量消息,参数3表示该消息为持久化消息,即除了保存到内存还会保存到磁盘中 for (int i = 1; i <= 10; i++) { channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, ("发送成功,这是第 " + i + " 条消息").getBytes()); } // 4.关闭资源 channel.close(); } }
2.2 消费者代码
这里我们编写两个消费者去消费生产出来的消息,两个消费者的代码大体一致,不同的是输出内容,所以此处只展示消费者01的代码:
/** * Description: 工作队列模式消费者01 */ public class Consumer01 { //队列名称 private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtil.getChannel(); channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("消费者01消费成功,内容为: " + message); } }); } }
2.3 测试
消费者01消费结果如下:
消费者02消费结果如下:
在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe),特点如下:
3.1 生产者代码
/** * Description: 发布订阅模式生产者 */ public class Producer { //交换机名称 private static final String EXCHANGE_NAME = "fanout_exchange"; //队列名称,分别为: 邮箱队列、信息队列、站内信队列 private static final String EMAIL_QUEUE_NAME = "send_email_queue"; private static final String MESSAGE_QUEUE_NAME = "send_message_queue"; private static final String STATION_QUEUE_NAME = "send_station_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtil.getChannel(); //创建fanout交换机 /** * 参数1:交换机名 * 参数2:交换机类型 * 参数3:交换机是否持久化 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true); //创建队列 channel.queueDeclare(EMAIL_QUEUE_NAME, true, false, false, null); channel.queueDeclare(MESSAGE_QUEUE_NAME, true, false, false, null); channel.queueDeclare(STATION_QUEUE_NAME, true, false, false, null); //交换机绑定队列 /** * 参数1:队列名 * 参数2:交换机名 * 参数3:路由关键字,发布订阅模式写""即可 */ channel.queueBind(EMAIL_QUEUE_NAME, EXCHANGE_NAME, ""); channel.queueBind(MESSAGE_QUEUE_NAME, EXCHANGE_NAME, ""); channel.queueBind(STATION_QUEUE_NAME, EXCHANGE_NAME, ""); //发送消息 String message = "京东618活动马上就要开始啦! 欢迎您登录京东参与~~~"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); //关闭资源 channel.close(); } }
创建的交换机详情如下:
创建的队列详情如下:
3.2 消费者代码
这里我们编写三个消费者,分别为短信消费者、邮件消费者、站内信消费者,三者的代码大体都一致,不同的是监听的对列名以及输出内容,所以此处只展示短信消费者的代码:
/** * Description: 短信消费者 */ public class MessageConsumer { private static final String MESSAGE_QUEUE_NAME = "send_message_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtil.getChannel(); channel.basicConsume(MESSAGE_QUEUE_NAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("发送短信成功,短信内容为: " + message); } }); } }
注意:也可以使用 工作队列 + 发布订阅 模式同时使用,即两个消费者同时监听一个队列
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,618大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用 路由模式 (Routing)完成这一需求。特点如下:
每个队列绑定路由关键字 RoutingKey
生产者将带有 RoutingKey 的消息发送给交换机,交换机根据 RoutingKey 转发到指定队列。
路由模式使用 direct 交换机。
能按照路由键将消息发送给指定队列
4.1 生产者代码
由于此模式是在发布订阅模式基础上新增了路由绑定规则,进而实现将消息发送给指定队列的功能,所以代码与上面代码大致相同,不过添加了路由key,具体代码如下:
/** * Description: 路由模式生产者 */ public class Producer { //交换机名称 private static final String EXCHANGE_NAME = "routing_exchange"; //队列名称,分别为: 邮箱队列、信息队列、站内信队列 private static final String EMAIL_QUEUE_NAME = "send_email_queue"; private static final String MESSAGE_QUEUE_NAME = "send_message_queue"; private static final String STATION_QUEUE_NAME = "send_station_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtil.getChannel(); //创建direct交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true); //创建队列 channel.queueDeclare(EMAIL_QUEUE_NAME, true, false, false, null); channel.queueDeclare(MESSAGE_QUEUE_NAME, true, false, false, null); channel.queueDeclare(STATION_QUEUE_NAME, true, false, false, null); //交换机绑定队列(重要的、普通的) channel.queueBind(EMAIL_QUEUE_NAME, EXCHANGE_NAME, "important"); channel.queueBind(MESSAGE_QUEUE_NAME, EXCHANGE_NAME, "important"); channel.queueBind(STATION_QUEUE_NAME, EXCHANGE_NAME, "important"); channel.queueBind(STATION_QUEUE_NAME, EXCHANGE_NAME, "common"); //发送消息(重要消息全部发送,否则只发送站内消息) String importMessage = "京东618活动马上就要开始啦! 欢迎您登录京东参与~~~"; String commonMessage = "京东满减优惠开始啦,诚邀您参与~~~"; channel.basicPublish(EXCHANGE_NAME, "important", null, importMessage.getBytes()); channel.basicPublish(EXCHANGE_NAME, "common", null, commonMessage.getBytes()); //关闭资源 channel.close(); } }
注意:
由于发布订阅模式中已经创建了 send_email_queue、send_message_queue、send_station_queue 三个队列,并且我们没有设置自动删除,所以 RabbitMQ 中一直有这三个队列,我们可以不用创建队列,也可以直接使用,此处为了代码的完整性,选择创建,queueDeclare() 方法如果队列已经存在,则直接使用了~
4.2 消费者代码
消费者可以直接使用上面发布订阅模式的三个消费者,故此处就不再展示。
下面为站内信消费者的消费结果,其他两个消费者的结果为一条,站内信消费者的结果为两条:
通配符模式(Topics)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的 RoutingKey 能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,通配符模式使用 topic 交换机。 能按照通配符规则将消息发送给指定队列
通配符规则如下:
5.1 生产者代码
由于此模式是在路由模式基础上给队列绑定了带通配符的路由关键字,进而实现将消息按照通配符规则发送给指定队列,所以代码与上面代码大致相同,具体代码如下:
/** * Description: 通配符模式生产者 */ public class Producer { //交换机名称 private static final String EXCHANGE_NAME = "topic_exchange"; //队列名称,分别为: 邮箱队列、信息队列、站内信队列 private static final String EMAIL_QUEUE_NAME = "send_email_queue"; private static final String MESSAGE_QUEUE_NAME = "send_message_queue"; private static final String STATION_QUEUE_NAME = "send_station_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtil.getChannel(); //创建direct交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true); //创建队列 channel.queueDeclare(EMAIL_QUEUE_NAME, true, false, false, null); channel.queueDeclare(MESSAGE_QUEUE_NAME, true, false, false, null); channel.queueDeclare(STATION_QUEUE_NAME, true, false, false, null); //交换机绑定队列 channel.queueBind(EMAIL_QUEUE_NAME, EXCHANGE_NAME, "#.email.#"); channel.queueBind(MESSAGE_QUEUE_NAME, EXCHANGE_NAME, "#.message.#"); channel.queueBind(STATION_QUEUE_NAME, EXCHANGE_NAME, "#.station.#"); //发送消息 String importMessage = "京东618活动马上就要开始啦! 欢迎您登录京东参与~~~"; String commonMessage = "京东满减优惠开始啦,诚邀您参与~~~"; //类似于模糊匹配 channel.basicPublish(EXCHANGE_NAME, "email.message.station", null, importMessage.getBytes()); channel.basicPublish(EXCHANGE_NAME, "station", null, commonMessage.getBytes()); //关闭资源 channel.close(); } }
5.2 消费者代码
消费者可以直接使用上面发布订阅模式的三个消费者,故此处就不再展示。
下面为站内信消费者的消费结果,其他两个消费者的结果为一条,站内信消费者的结果为两条:
1. 添加依赖
org.springframework.boot spring-boot-starter-amqp org.projectlombok lombok
2. 配置文件
server: port: 8888 spring: rabbitmq: host: 192.168.43.100 port: 5672 username: lyl password: lyl virtual-host: /
3. 配置类
SpringBoot 整合 RabbitMQ 时,需要在配置类中创建队列和交换机,具体如下:
/** * Description: RabbitMq配置类 */ @Configuration public class RabbitConfig { private static final String TOPIC_EXCHANGE_NAME = "topicExchange"; private static final String QUEUE_NAME = "bootQueue"; /** * 创建交换机 */ @Bean("topicExchange") public Exchange getExchange() { return ExchangeBuilder //交换机类型 .topicExchange(TOPIC_EXCHANGE_NAME) //是否持久化 .durable(true) .build(); } /** * 创建队列 */ @Bean("bootQueue") public Queue getQueue() { return QueueBuilder //队列持久化 .durable(QUEUE_NAME) .build(); } /** * 交换机绑定队列 */ @Bean public Binding bingMessageQueue(@Qualifier("topicExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue){ return BindingBuilder .bind(queue) .to(exchange) //路由规则 .with("#.message.#") //没有其他参数 .noargs(); } }
4. 生产者代码
SpringBoot 整合 RabbitMQ 时,提供了工具类 RabbitTemplate 发送消息,编写生产者时只需要注入 RabbitTemplate 即可发送消息。
@SpringBootTest class RabbitmqProducerApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage() { /** * 发送消息 * 1.exchange: 交换机 * 2.routingKey: 路由key * 3.object: 发送的消息 */ rabbitTemplate.convertAndSend("topicExchange","message","快来参加京东618活动哦~~~"); } }
5. 消费者代码
此处需要编写另一个 SpringBoot 项目作为 RabbitMQ 的消费者,因为如果在一个项目中可以通过方法调用就行了,没有必要通过 RabbitMQ 来进行通信了
搭建 RabbitMQ 的 SpringBoot 项目步骤与前面相同,这里直接展示消费者代码:
/** * Description: 消费者 */ @Component public class Consumer { /** * 监听队列 */ @RabbitListener(queues = "bootQueue") public void listenMessage(String message) { System.out.println("接收消息:" + message); } }
@RabbitListener 注解用来监听队列,放在具体的方法上面。
RabbitMQ 消息的投递路径为:
生产者 ------> 交换机 ------> 队列 ------> 消费者
在 RabbitMQ 工作的过程中,每个环节消息都有可能传递失败,RabbitMQ 可以通过以下三种模式来监听消息时候投递成功:
确认模式可以监听消息是否从生产者成功传递到交换机,具体使用如下:
1.1 在生产者模块的配置文件中加入配置
spring: rabbitmq: # 开启确认模式 publisher-confirm-type: correlated
spring.rabbitmq.publisher-confirm-type 属性有三个值:
- NONE:禁用发布确认模式,默认值
- CORRELATED:发布消息成功到交换器后会触发回调方法,一般使用此值
- SIMPLE:简单使用
CORRELATED 和 SIMPLE 的区别:
- CORRELATED:发布者将收到一个带有确认标识 的确认消息,该标识与发布的每个消息相关联。这种模式可以确保每个消息被确认,但需要在发送和接收确认之间进行映射。
- SIMPLE:发布者将仅收到一条确认消息,表示所有消息都已经成功发布。这种模式相对简单,但不能保证每个消息都成功确认。
1.2 回调接口类
/** * Description: 定义确认模式的回调方法,消息向交换机发送后会调用 confirm()方法 */ @Component @Slf4j public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void setConfirmCallback(){ //将当前实现类注入到rabbitTemplate的确认回调 rabbitTemplate.setConfirmCallback(this); } /** * 交换机确认回调接口 * @param correlationData 存储消息的ID和自己存储的关于该条消息的信息 * @param ack 交换机是否接收成功 * @param cause 异常原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = null != correlationData ? correlationData.getId() : ""; if (ack) { log.info("交换机接收到id为:{}的消息", id); }else { log.info("交换机未接收到id为:{}的消息,原因为{}", id, cause); //TODO 交换机未收到消息,可以进行对应的业务处理 } } }
1.3 控制类
@RestController @Slf4j @RequestMapping("/producer") public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/confirm/{msg}") public void sendMsgWithConfirm(@PathVariable String msg) { rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE_NAME, "message", msg, new CorrelationData()); log.info("生产者发送消息:{}", msg); } }
1.4 使用 Postman 进行测试
正常发送,控制台输出如下:
发送失败的情况,将交换机的名称改为不存在的,控制台输出如下:
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的,此时通过开始消息回退将消息传递过程中不可达目的地时将消息返回给生产者。
退回模式可以监听消息是否从交换机成功传递到队列,具体使用如下:
2.1 添加配置
spring: rabbitmq: # 开启回退模式 publisher-returns: true
2.2 回调接口类
/** * Description: 定义退回模式的回调方法。交换机发送到队列失败后才会执行 returnedMessage()方法 */ @Component @Slf4j public class MyReturnCallback implements RabbitTemplate.ReturnsCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void setReturnsCallback() { rabbitTemplate.setReturnsCallback(this); } /** * 消息回退接口,只有当消息无法传递到目的地时才进行回退 * @param returned 失败后将失败信息封装到参数中 */ @Override public void returnedMessage(ReturnedMessage returned) { log.error("消息 {}, 被交换机 {} 退回, 应答代码 {}, 原因 {}, 路由 {}", new String(returned.getMessage().getBody()), returned.getExchange(), returned.getReplyCode(), returned.getReplyText(), returned.getRoutingKey()); //TODO 消息回退,可以进行对应的业务处理 } }
2.3 使用 Postman 进行测试
发送成功不会打印对应日志,只有当消息发送失败时才进行回退,才会打印对应日志。
发送失败的情况,将路由的名称改为不存在的,控制台输出如下:
在 RabbitMQ 中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。
这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。消息分为 自动确认 和 手动确认。
自动确认,指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。
手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。
自动确认:spring.rabbitmq.listener.simple.acknowledge-mode = none
手动确认:spring.rabbitmq.listener.simple.acknowledge-mode = manual
注意:此模式的配置要配置在消费端
3.1 消费者添加配置
spring: rabbitmq: listener: simple: # 开启手动确认模式 acknowledge-mode: manual
3.2 手动确认模式的消费者
/** * Description: 手动确认模式的消费者 */ @Component public class AckConsumer { @RabbitListener(queues = "bootQueue") public void listenMsg(Message message, Channel channel) throws IOException { //消息投递号,每次投递消息该值都会加1 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { int i = 1/0; System.out.println("成功接收到消息:" + message); /** * 手动签收消息 * deliveryTag: 消息投递号 * true: 是否可以一次签收多条消息 */ channel.basicAck(deliveryTag, true); } catch (Exception e) { System.out.println("消息接收失败~!"); /** * 拒签消息,设置消息重回队列中 * deliveryTag: 消息投递号 * true: 是否可以一次拒签多条消息 * true: 拒签后消息是否重回队列,不放回消息则会放到死信队列 */ channel.basicNack(deliveryTag, true, true); } } }
注意:上面代码由于模拟运行时异常,拒签后重新放回队列中,然后重新执行,所以会一直输出"消息接收失败~!"
假想我们 RabbtiMQ 服务器积压了成千上万条未处理的消息,然后随便打开一个消费者客户端,巨量的消息瞬间全部喷涌推动过来,但是单个客户端无法同时处理这么多条数据,就会被压垮崩溃。
RabbitMQ 提供了一种 Qos(Quality Of Service,服务质量)服务质量保证功能。即在非自动确认消息的前提下,如果一定数目的消息未被确认之前,不再进行消费新的消息。
我们可以通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。
拓展:预取值(Prefetch)
在 RabbitMQ 中,预取值(Prefetch Count)是指消费者从队列中预取的消息数量。当一个消费者连接到一个队列并开始消费消息时,它可以通过设置预取值来控制一次从队列中获取的消息数量。预取值可以在消费者创建时进行设置,也可以在运行时进行更改。
预取值的主要作用是控制消费者的负载,避免一个消费者在处理消息时占用过多的资源,导致其他消费者无法获得足够的资源。通过限制每次预取的消息数量,可以控制消费者的处理速度,避免过度消费队列中的消息。
预取值的设置方式有两种:
全局设置:通过 channel.basicQos(prefetchCount) 方法设置全局预取值。在这种情况下,所有的消费者都将使用相同的预取值。
单独设置:通过 channel.basicConsume(queue, consumer) 方法的 prefetchCount 参数设置单独的预取值。在这种情况下,每个消费者都可以使用不同的预取值。
需要注意的是,预取值并不是绝对的,它只是一个提示值。当消费者处理完预取值数量的消息后,它可以继续从队列中获取更多的消息,不受预取值的限制。同时,当队列中的消息数量少于预取值时,消费者将无法获取更多的消息,直到队列中有新的消息可用。因此,预取值的设置应该根据实际情况进行调整,以保证消费者的负载均衡和队列的稳定性
1.1 消费端配置限流机制
spring: rabbitmq: listener: simple: # 开启手动确认模式(消费端限流必须开启) acknowledge-mode: manual # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息 prefetch: 5
1.2 生产者发送多条消息
@Test public void testSendBatchMsg() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("topicExchange", "message", i + " 快来参加京东618活动哦~~~"); } }
1.3 启动消费者监听队列
/** * Description: 消费端限流的消费者 */ @Component @Slf4j public class LimitConsumer { @RabbitListener(queues = "bootQueue") public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException { // 1.获取消息 String msg = new String(message.getBody()); log.info("接收到的消息为: {}", msg); // 2.模拟业务处理 Thread.sleep(3000); // 3.手动签收消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE); } }
1.4 消费过程中管控台如下:
1.5 消费结果如下:
在 RabbitMQ 中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1 处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1 有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。
2.1 添加消费端配置
spring: rabbitmq: listener: simple: # 开启手动确认模式(消费端限流必须开启) acknowledge-mode: manual # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息 # prefetch: 5 # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发 prefetch: 1
2.2 生产者发送多条消息
与上面一致即可
2.3 添加两个效率不一样的消费者
/** * Description: 不公平分发消费者 */ @Component @Slf4j public class UnfairConsumer { @RabbitListener(queues = "bootQueue") public void listenMsgOne(Message message, Channel channel) throws InterruptedException, IOException { // 1.获取消息 String msg = new String(message.getBody()); log.info("消费者 1 接收到的消息为: {}", msg); // 2.模拟业务处理, 快 Thread.sleep(500); // 3.手动签收消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE); } @RabbitListener(queues = "bootQueue") public void listenMsgTwo(Message message, Channel channel) throws InterruptedException, IOException { // 1.获取消息 String msg = new String(message.getBody()); log.info("消费者 2 接收到的消息为: {}", msg); // 2.模拟业务处理, 慢 Thread.sleep(3000); // 3.手动签收消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE); } }
2.4 正常情况下的消费结果
2.5 利用消费端限流实现不公平分发后
注意:
正常情况下,当生产者一次性向队列中发送多条消息后,如果设置了预取值(即限流),每个消费者会预先"占有"相应预取值数量的消息条数(可以理解为缓存),然后每消费一条会去队列中取到消息来填充预取值,直到消息被消费完。
RabbitMQ 可以设置消息的存活时间(Time To Live,简称TTL),单位是毫秒,当消息到达存活时间后还没有被消费,会被移出队列。
RabbitMQ 可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。具体使用如下:
消息到达存活时间未被消费时,消息会被放入死信队列。
3.1 对队列的所有消息设置存活时间
在 RabbitMQ 配置类中添加以下配置(原文件代码在上面整合案例中):
/** * 创建有存活时间的消息队列 */ @Bean("ttlQueue") public Queue getTtlQueue() { return QueueBuilder .durable("ttlQueue") //10s消息过期 .ttl(10000) .build(); } /** * 绑定带有存活时间的队列 */ @Bean public Binding bingTtlQueue(@Qualifier("topicExchange") Exchange exchange, @Qualifier("ttlQueue") Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with("ttl") .noargs(); }
下面则正常生产者发送消息,然后等待10s后,消息会消失。此处不再展示测试代码。
3.2 对某条消息设置存活时间
@Test public void testSendMsgWithTtl() { //设置消息属性 MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("10000"); //创建消息对象 Message message = new Message("测试发送带有过期时间的消息~~~".getBytes(StandardCharsets.UTF_8), messageProperties); //发送消息 rabbitTemplate.convertAndSend("topicExchange", "message", message); }
注意:
可以在以上的案例中,加入一个循环发送消息,在 i 等于中间某一个数字时(例如 i = 3),设置此条消息有过期时间,其他则正常发送消息。通过结果可以得知,10s过后,消息并没有被马上移除,但该消息已经不会被消费了,当他到达队列顶端时才会被移除。此处不再展示测试代码。
RabbitMQ 优先级队列(Priority Queue)是一种特殊的队列,它根据消息的优先级将其放置在队列中。当消费者从队列中获取消息时,它将按照优先级从高到低的顺序获取消息。优先级队列可以用于处理一些需要按照优先级处理的消息,例如日志记录、任务调度等。具体使用如下:
4.1 创建消息队列及绑定交换机
在 RabbitMQ 配置类中添加以下配置(原文件代码在上面整合案例中):
/** * 创建有优先级的消息队列 */ @Bean("priorityQueue") public Queue getPriorityQueue() { return QueueBuilder .durable("priorityQueue") //设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源 .maxPriority(10) .build(); } /** * 绑定带有优先级的队列 */ @Bean public Binding bingPriorityQueue(@Qualifier("topicExchange") Exchange exchange, @Qualifier("priorityQueue") Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with("priority") .noargs(); }
4.2 生产者代码
@Test public void testPriority() { for (int i = 0; i < 50; i++) { //i为5的倍数时优先级较高 if (i % 5 == 0) { MessageProperties messageProperties = new MessageProperties(); //设置优先级 messageProperties.setPriority(9); //创建消息对象 Message message = new Message((i + " 测试发送带有优先级的消息~~~").getBytes(StandardCharsets.UTF_8), messageProperties); //发送消息 rabbitTemplate.convertAndSend("topicExchange", "priority", message); } else { rabbitTemplate.convertAndSend("topicExchange", "priority", i + " 普通级别的消息~~~"); } } }
4.3 消费者代码
/** * Description: 监听具有优先级的队列消息 */ @Component @Slf4j public class PriorityConsumer { @RabbitListener(queues = "priorityQueue") public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException { // 1.获取消息 String msg = new String(message.getBody()); log.info("接收到的消息为: {}", msg); Thread.sleep(2000); // 2.手动签收消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), Boolean.TRUE); } }
4.4 管控台如下
在 MQ 中,当消息在队列中由于某些原因没有被及时消费而变成死信(Dead Message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在 RabbitMQ 中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。
**死信交换机 和 死信队列与普通的没有区别。**死信队列只是一种特殊的队列,里面的消息仍然可以消费。
消息成为死信的情况:
实现流程如下以及具体代码如下:
1. 在 RabbitMQ 的配置类中添加如下配置,重点在于创建普通队列时绑定死信交换机(即代码的59-62行)
public static final String DEAD_EXCHANGE_NAME = "deadExchange"; public static final String DEAD_QUEUE_NAME = "deadQueue"; public static final String NORMAL_EXCHANGE_NAME = "normalExchange"; public static final String NORMAL_QUEUE_NAME = "normalQueue"; /** * 创建死信交换机,与普通交换机一样 */ @Bean("deadExchange") public Exchange getDeadExchange() { return ExchangeBuilder .topicExchange(DEAD_EXCHANGE_NAME) .durable(true) .build(); } /** * 创建死信队列,与普通队列一样 */ @Bean("deadQueue") public Queue getDeadQueue() { return QueueBuilder .durable(DEAD_QUEUE_NAME) .build(); } /** * 死信交换机绑定死信队列 */ @Bean public Binding bingDeadQueue(@Qualifier("deadExchange") Exchange exchange, @Qualifier("deadQueue") Queue queue){ return BindingBuilder .bind(queue) .to(exchange) .with("dead_route") .noargs(); } /** * 创建普通交换机 */ @Bean("normalExchange") public Exchange getNormalExchange() { return ExchangeBuilder .topicExchange(NORMAL_EXCHANGE_NAME) .durable(true) .build(); } /** * 创建普通队列 */ @Bean("normalQueue") public Queue getNormalQueue() { return QueueBuilder .durable(NORMAL_QUEUE_NAME) // 绑定死信交换机 .deadLetterExchange(DEAD_EXCHANGE_NAME) // 死信队列路由关键字 .deadLetterRoutingKey("dead_route") // 消息存活10s(此队列消息过期会变成死信) .ttl(10000) // 队列最大长度为10(此队列的长度大于10会变成死信) .maxLength(10) .build(); } /** * 普通交换机绑定普通队列 */ @Bean public Binding bingNormalQueue(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue){ return BindingBuilder .bind(queue) .to(exchange) .with("normal_route") .noargs(); }
2. 分别测试三种变为死信消息的情况
@Test public void testDLX(){ // 1.存活时间过期后变成死信 rabbitTemplate.convertAndSend("normalExchange","normal_route","测试消息过期,消息会成为死信~~~"); // 2.超过队列长度后变成死信 for (int i = 0; i < 15; i++) { rabbitTemplate.convertAndSend("normalExchange","normal_route","测试超过队列长度,消息会成为死信~~~"); } // 3.消息拒签但不返回原队列后变成死信 rabbitTemplate.convertAndSend("normalExchange","normal_route","测试消费者拒签并且不放回队列,消息会成为死信~~~"); }
第三种情况消费者代码为:
@Component public class DLXConsumer { @RabbitListener(queues ="normalQueue") public void listenMessage(Message message, Channel channel) throws IOException { // 拒签消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false); } }
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
但 RabbitMQ 中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。
延迟队列:TTL + 死信队列的合体
我们可以使用死信队列来实现延迟队列的功能,大致代码同上面死信队列代码一致,不过我们需要加一个专门监听死信队列的消费者,将超时的消息处理掉即可。
@Component @Slf4j public class DeadConsumer { @RabbitListener(queues = "deadQueue") public void receiveD(Message message, Channel channel){ String msg = new String(message.getBody()); log.info("当前时间{},收到死信队列的消息:{}", new Date(), msg); } }
在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。可以参考以下流程图:
RabbitMQ 虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。可以参考以下流程图:
2.1 安装延迟队列插件
# 先使用xftp将插件上传至虚拟机 # 将插件放入RabbitMQ插件目录中(也可以使用xftp上传至插件目录) [root@localhost opt]# mv rabbitmq_delayed_message_exchange-3.9.0.ez /usr/local/rabbitmq/plugins/ # 启用插件 [root@localhost opt]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 重启rabbitmq [root@localhost opt]# rabbitmqctl stop [root@localhost opt]# rabbitmq-server restart -detached
2.2 查看管控台交换机类型
2.3 创建延迟交换机和延迟队列
/** * Description: 延迟队列rabbitmq配置类 */ @Configuration public class DelayedRabbitConfig { public static final String DELAYED_EXCHANGE = "delayed_exchange"; public static final String DELAYED_QUEUE = "delayed_queue"; public static final String DELAYED_ROUTE_KEY = "delayed_route"; /** * 创建延迟交换机 */ @Bean("delayed_exchange") public Exchange getDelayedExchange() { // 创建自定义交换机 Mapargs = new HashMap<>(1); // topic类型的延迟交换机 args.put("x-delayed-type", "topic"); /** * 参数1: 交换机名称 * 参数2: 交换机类型(x-delayed-message代表延迟交换机) * 参数3: 是否持久化 * 参数4: 是否自动删除 * 参数5: 额外参数 */ return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args); } /** * 创建延迟队列 */ @Bean("delayed_queue") public Queue getDelayedQueue() { return QueueBuilder .durable(DELAYED_QUEUE) .build(); } /** * 绑定延迟队列 */ @Bean public Binding bindingDelayedQueue(@Qualifier("delayed_queue") Queue queue, @Qualifier("delayed_exchange") Exchange exchange) { return BindingBuilder .bind(queue) .to(exchange) .with(DELAYED_ROUTE_KEY) .noargs(); } }
2.4 生产者代码
@GetMapping("sendDelayed") public void sendDelayed(String message, Integer delayedTime){ log.info("当前时间:{},发送一条过期时间{}信息给delayed交换机:{}", new Date(),delayedTime, message); MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(delayedTime); return message; } }; rabbitTemplate.convertAndSend("delayed_exchange", "delayed_route", message, messagePostProcessor); //另一种写法 rabbitTemplate.convertAndSend("delayed_exchange", "delayed_route",message, msg -> { //设置延迟时间 msg.getMessageProperties().setDelay(delayedTime); return msg; }); }
2.5 消费者代码
/** * Description: 延迟队列消费者 */ @Component @Slf4j public class DelayedConsumer { @RabbitListener(queues = "delayed_queue") public void receive(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间{},收到延时交换机的消息:{}", new Date(), msg); } }