一、RabbitMQ 部署及配置详解(集群部署)
二、RabbitMQ 部署及配置详解 (单机)
三、RabbitMQ 详解及实例(含错误信息处理)
四、RabbitMq死信队列及其处理方案
五、RabbitMQ Java开发教程—官方原版
六、RabbitMQ Java开发教程(二)—官方原版
七、RabbitMQ Java开发教程(三)—官方原版_
Producer: 消息的生产者,用于发布消息;
Consumer: 消息的消费者,用于从队列中获取消息.消费者只需关注队列即可,不需要关注交换机和路由键。消费者可以通过basicConsume(订阅模式可以从队列中一直持续的自动的接收消息)或者basicGet(先订阅消息,然后获取单条消息,再然后取消订阅。也就是说basicGet一次只能获取一条消息,如果还想再获取下一条还要再次调用basicGet)来从队列中获取消息。
RabbitMQ broker: 官方定义"RabbitMQ isn’t a food truck, it’s a delivery service",指明RabbitMQ是一种传输服务。
Exchange: 生产者会将消息发送到交换机,然后交换机通过路由策略(规则)将消息路由到匹配的队列中去。ExchangeType决定了Exchange路由消息的行为,在RabbitMQ中,ExchangeType有direct、Fanout、Topic和Header 4种。Exchange 类似于数据通信网络中的交换机,提供消息路由策略。
在RabbitMQ 中,Producer 不是通过信道直接将消息发送给 Queue,而是先发送给 ExChange. 一个 ExChange 可以和多个 Queue 进行绑定,Producer 在传递消息的时候,会传递一个 ROUTING_KEY, ExChange 会根据这个 ROUTING_KEY 按照特定的路由算法,将消息路由给指定的 Message Queue。与 Queue 一样, ExChange 也可设置为持久化,临时或者自动删除。
所谓绑定就是将一个特定的 ExChange 和一个特定的 Queue 绑定起来,所以Binding不是一个概念,而是一种操作。RabbitMQ中通过绑定,以路由键作为桥梁将Exchange与Queue关联起来()Exchange—>Routing Key—>Queue,这样RabbitMQ就知道如何正确地将消息路由到指定的队列了,通过queueBind()方法将Exchange、Routing Key、Queue绑定起来.ExChange 和 Queue 的绑定可以是多对多的关系。
Binding Key: 它表示的是Exchange与Message Queue是通过binding key进行绑定联系的,这个关系是固定的。初始化的时候,我们就会建立该队列。
Routing Key: 它是一个String值,用于定义路由规则。生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。在队列绑定的时候需要指定路由键,在生产者发布消息的时候需要指定路由键,当消息的路由键和队列绑定的路由键匹配时,消息就会发送到该队列。
用于存储消息的容器,可以看成一个有序的数组,生产者生产的消息会发送到交换机中,最终交换机将消息存储到某个或某些队列中。队列可被消费者订阅,消费者从订阅的队列中获取消息。
Message Queue: 消息队列,我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到相应的queue则数据会丢失),等待消费者来取.
消息队列提供了 FIFO 的处理机制,具有缓存消息的能力.在RabbitMQ 中,队列消息可以设置为持久化,临时或者自动删除.
设置为持久化的队列,Queue 中的消息会在 Server 本地硬盘存储一份,防止系统 Crash,数据丢失;
设置为临时的队列,Queue 中的数据在系统重启之后就会丢失;
设置为自动删除的队列,当没有用户连接到 Server,队列中的数据会被自动删除。
每一个RabbitMQ服务器都能创建多个虚拟消息服务器,我们称之为虚拟主机.每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的交换机、队列、绑定等,拥有自己的权限机制.vhost相对于RabbitMQ就像虚拟机之于物理机一样.他们通过在各个实例间提供逻辑上的分离,允许不同的应用程序安全保密的运行数据,这很有用,它既能将同一个Rabbit的众多客户区分开来,又可以避免队列和交换器的命名冲突.RabbitMQ提供了开箱即用的默认的虚拟主机“/”,如果不需要多个vhost可以直接使用这个默认的vhost,通过使用缺省的guest用户名和guest密码来访问默认的vhost.
vhost之间是相互独立的,这避免了各种命名的冲突,就像App中的沙盒的概念一样,每个沙盒是相互独立的,且只能访问自己的沙盒,以保证非法访问别的沙盒带来的安全隐患.
rabbitmq部署有三种模式:单机模式,普通集群模式,镜像集群模式
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现
由于rabbitmq是基于erlang语言开发的,所以必须先安装erlang。
wget http://erlang.org/download/otp_src_20.0.tar.gz tar -zxvf otp_src_20.0.tar.gz cd otp_src_20.0 ./configure --prefix=/usr/local/erlang --without-javac make make install
加入环境变量
vim /etc/profile 在最后添加: export ERLANG_HOME=/usr/local/erlang export PATH=$PATH:/usr/local/erlang/bin
运行以下命令,使环境变量立即生效
source /etc/profile
验证erl是否成功安装
erl
安装 rabbitmq
可以去官网手动下载(Downloading and Installing RabbitMQ — RabbitMQ)也可以通过wget命令下载
cd /opt wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.11.6/rabbitmq-server-3.11.6.tar.xz xz -d rabbitmq-server-generic-unix-3.11.6.tar.xz tar -xvf rabbitmq-server-generic-unix-3.6.10.tar cp -r /opt/rabbitmq_server-3.6.10 /usr/local/rabbitmq
注:该方式安装的RabbitMQ是没有配置文件的,如需要配置文件,需手动进行配置,文件置于自己Rabbitmq安装路径下的 /etc/rabbitmq/rabbitmq.conf 即可,再管理页面或者日志中都可以查看到路径位置
/usr/local/rabbitmq/sbin/rabbitmq-server -detached echo "/usr/local/rabbitmq/sbin/rabbitmq-server -detached" >> /etc/rc.local #添加开机自启动。
开启 web 管理界面
/usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management
RabbitMQ默认的账号用户名和密码都是guest。
rabbitmqctl delete_user guest
创建RabbitMQ管理员用户
#创建一个新用户 rabbitmqctl add_user <用户名> <密码> #将创建的新用户设置为管理员 rabbitmqctl set_user_tags <用户名> administrator #赋予新创建的用户所有权限 rabbitmqctl set_permissions -p / <用户名> ".*" ".*" ".*"
在本地主机中,使用浏览器访问Linux实例的公网IP:15672。
显示如下页面,说明RabbitMQ安装成功。
在多台机器上启动多个rabbitmq实例,每个机器启动一个。
但是你创建的queue,只会放在一个rabbtimq实例上,但是每个实例都同步queue的元数据(存放含queue数据的真正实例位置)。消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。
所有 受支持的RabbitMQ版本 的主配置文件都使用 类似ini的sysctl配置文件格式。该文件通常命名为Rabbitmq.conf。
rabbitmq.conf | 新样式格式(sysctl或类似ini的格式) | 主配置文件。应该用于大多数设置。对于人类来说,阅读和机器(部署工具)的生成更容易。并非所有设置都可以这种格式表示。 |
advanced.config | 经典(Erlang术语) | 不能以新样式配置格式表示的有限数量的设置,例如 LDAP查询。仅在必要时使用。 |
rabbitmq-env.conf | 环境变量对 | 用于在一处设置与RabbitMQ相关的 环境变量。 |
org.springframework.boot spring-boot-starter-amqp
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /*rabbitMq基本配置*/ @Configuration public class RabbitMqConfig { public final static String productExchange = "productExchange "; private String productQueue = "productQueue"; public final static String ERROR_EXCHANGE = "le.error.exchange"; private String topicExchangeName = ""; @Bean public FanoutExchange productExchange() { return new FanoutExchange(productExchange); } public TopicExchange topicExchange() { return new TopicExchange(topicExchangeName); } @Bean public DirectExchange errorExchang() { return new DirectExchange(ERROR_EXCHANGE, true, false); } @Bean public Queue productQueue() { return new Queue(productQueue, true); } @Bean public Binding productBinding(Queue productQueue, FanoutExchange productExchange) { return BindingBuilder.bind(productQueue).to(productExchange); } }
@Component public class ProductProducer { Logger logger = LoggerFactory.getLogger(ProductProducer.class); @Autowired RabbitTemplate rabbitTemplate; @Autowired Gson gson; public void push(ProfitEntity profitEntity) { String json = gson.toJson(""); logger.info("###消息发送者: json=" + json); rabbitTemplate.convertAndSend(RabbitmqConfig.Product_EXCHANGE, null, json); } }
@Component public class TransProductConsumer { Logger logger = LoggerFactory.getLogger(TransproductConsumer.class); @Autowired RabbitTemplate rabbitTemplate; @Autowired ErrorProducer errorProducer; @Autowired ProductService productService; @RabbitHandler @RabbitListener(queues = RabbitmqConfig.productQueue ) public void comsume(String json) { try { logger.info("##start##消费者:TransProductConsumer ,接收到消息:json=" + json); } catch (Exception e) { logger.error("计算消息队列出错 zf.Product.caculate.queue ", e); errorProducer.push(RabbitmqConfig.ERROR_PROFIT_CACULATE_QUEUE, json); } } }
@Component public class ErrorProducer { Logger logger = LoggerFactory.getLogger(TransDetailProducer.class); @Autowired RabbitTemplate rabbitTemplate; @Autowired Gson gson; public void push(String queueName, String errorJson) { logger.info("###异常消息发送者: queueName=" + queueName + " errorJson=" + errorJson); rabbitTemplate.convertAndSend(RabbitmqConfig.ERROR_EXCHANGE, queueName, errorJson); } public void push(String queueName, Object obj) { String errorJson = gson.toJson(obj); logger.info("###异常消息发送者: queueName=" + queueName + " errorJson=" + errorJson); rabbitTemplate.convertAndSend(RabbitmqConfig.ERROR_EXCHANGE, queueName, errorJson); } }