第一章 Java线程池技术应用
第二章 CountDownLatch和Semaphone的应用
第三章 Spring Cloud 简介
第四章 Spring Cloud Netflix 之 Eureka
第五章 Spring Cloud Netflix 之 Ribbon
第六章 Spring Cloud 之 OpenFeign
第七章 Spring Cloud 之 GateWay
第八章 Spring Cloud Netflix 之 Hystrix
第九章 代码管理gitlab 使用
第十章 SpringCloud Alibaba 之 Nacos discovery
第十一章 SpringCloud Alibaba 之 Nacos Config
第十二章 Spring Cloud Alibaba 之 Sentinel
第十三章 JWT
第十四章 RabbitMQ应用
一般MQ用于系统解耦、削峰使用,常见于微服务、业务活动等场景。 MQ(消息队列)在微服务、业务活动等场景中的应用主要表现为系统解耦和削峰。
系统解耦
场景描述:在微服务架构中,服务与服务之间需要通信。如果采用直接调用方式,服务间会存在强依赖关系,一个服务的改动可能引发连锁反应。MQ作用:服务间可以通过消息队列进行通信,一个服务将消息放入队列,另一个服务从队列中取出消息进行处理。这种方式下,服务间实现了解耦,降低了相互的依赖。
削峰
场景描述:在业务活动期间,由于用户请求量短时间内剧增,可能导致系统压力过大甚至崩溃。MQ作用:通过消息队列实现请求的缓冲。在高并发场景下,系统可以将请求放入消息队列,然后异步处理这些请求,从而平滑系统的处理负载,确保系统的稳定性。
综上所述,MQ因其独特的队列属性和消息传递模式,在分布式、微服务架构中发挥着重要的作用,提高了系统的可用性和稳定性。
RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
Queue:队列,是RabbitMQ的内部对象,用于存储消息
Exchange:交换器。生产者将消息发送到Exchange(交换器,通常也可以用大写的"X"来表示),有交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。
RoutingKey:路由键。生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
Binding:绑定。RabbitMQ中通过绑定将交换器与队列联合起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。
自学参考:https://blog.csdn.net/qq_38550836/article/details/95358353
无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。
Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等
https://github.com/erlang/otp/releases/download/OTP-25.2/otp_win64_25.2.exe
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.11.5/rabbitmq-server-3.11.5.exe
配置环境变量
cd D:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.5\sbin
开启rabbitmq-plugins插件
rabbitmq-plugins enable rabbitmq_management
打开地址
http://127.0.0.1:15672/
输入用户名/密码:guest/guest
org.springframework.boot spring-boot-starter-amqp
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
package com.xxxx.user.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { /******************direct**********************/ /** * 创建direct队列 * @return */ @Bean public Queue directQueue(){ return new Queue("directQueue"); } /** * 创建direct交换机 * @return */ @Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); } /** * 把队列和交换机绑定在一起 * @param queue * @param directExchange * @return */ @Bean public Binding bindingDirect(@Qualifier("directQueue") Queue queue, DirectExchange directExchange){ return BindingBuilder.bind(queue).to(directExchange).with("routingKey"); } /******************topic**********************/ @Bean public Queue topicQuerue1(){ return new Queue("topicQuerue1"); } @Bean public Queue topicQuerue2(){ return new Queue("topicQuerue2"); } @Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); } @Bean public Binding bindingTopic1(@Qualifier("topicQuerue1") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){ return BindingBuilder.bind(queue).to(topicExchange).with("topic.key1"); } /** * 通配符:* 表示一个词,# 表示零个或多个词 * @param queue * @param topicExchange * @return */ @Bean public Binding bindingTopic2(@Qualifier("topicQuerue2") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){ return BindingBuilder.bind(queue).to(topicExchange).with("topic.#"); } /******************fanout**********************/ @Bean public Queue fanoutQueue1(){ return new Queue("fanoutQueue1"); } @Bean public Queue fanoutQueue2(){ return new Queue("fanoutQueue2"); } @Bean public Queue fanoutQueue3(){ return new Queue("fanoutQueue3"); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } @Bean public Binding bindingFanout1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } @Bean public Binding bindingFanout2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } @Bean public Binding bindingFanout3(@Qualifier("fanoutQueue3") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }
package com.xxxx.user.consumer; import com.xxxx.drp.common.entity.UserInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j @RabbitListener(queues = "directQueue") public class DataDirectReceiver { @RabbitHandler public void process(String data){ log.info("收到directQueue队列信息:" + data); } @RabbitHandler public void process(UserInfo data){ log.info("收到directQueue队列信息:" + data); } }
package com.xxxx.user.consumer; import com.xxxx.common.entity.UserInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j @RabbitListener(queues = {"topicQuerue1","topicQuerue2"}) public class DataFanoutReceiver { @RabbitHandler public void process(String data){ log.info("收到topicQuerue队列信息:" + data); } @RabbitHandler public void process(UserInfo data){ log.info("收到topicQuerue队列信息:" + data); } }
package com.xxxx.user.consumer; import com.xxxx.common.entity.UserInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j @RabbitListener(queues = {"fanoutQueue1","fanoutQueue2","fanoutQueue3"}) public class DataTopicReceiver { @RabbitHandler public void process(String data){ log.info("收到topicQuerue队列信息:" + data); } @RabbitHandler public void process(UserInfo data){ log.info("收到topicQuerue队列信息:" + data); } }
package com.xxxx.user; import com.xxxx.common.entity.UserInfo; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class DataSender { @Autowired private RabbitTemplate rabbitTemplate; @Test public void sendDirect(){ UserInfo userInfo = new UserInfo(); userInfo.setUserAccount("tiger"); userInfo.setPassword("12345"); this.rabbitTemplate.convertAndSend("directExchange","routingKey",userInfo); } @Test public void sendTopic(){ this.rabbitTemplate.convertAndSend("topicExchange","topic.key2","Hello world topic"); } @Test public void sendFanout(){ this.rabbitTemplate.convertAndSend("fanoutExchange","","Hello world topic"); } }
MQ因其独特的队列属性和消息传递模式,在分布式、微服务架构中发挥着重要的作用,提高了系统的可用性和稳定性。人工智能AI编程知识库