RabbitMQ
# RabbitMQ
# RabbitMQ入门
# RabbitMQ安装和使用
# 一、安装依赖环境
- 1.在 http://www.rabbitmq.com/which-erlang.html (opens new window) 页面查看安装rabbitmq需要安装erlang对应的版本
- 2.在 https://github.com/rabbitmq/erlang-rpm/releases (opens new window) 页面找到需要下载的erlang版本,erlang-*.centos.x86_64.rpm就是centos版本的。
- 3.复制下载地址后,使用wget命令下载
wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
- 4.安装 Erlang
sudo rpm -Uvh /home/download/erlang-21.2.3-1.el7.centos.x86_64.rpm
- 5.安装 socat
sudo yum install -y socat
# 二、安装RabbitMQ
- 1.在官方下载页面找到CentOS7版本的下载链接,下载rpm安装包
wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm
提示:可以在https://github.com/rabbitmq/rabbitmq-server/tags (opens new window)下载历史版本
- 2.安装RabbitMQ
sudo rpm -Uvh /home/download/rabbitmq-server-3.7.9-1.el7.noarch.rpm
# 三、启动和关闭
- 启动服务
sudo systemctl start rabbitmq-server
- 查看状态
sudo systemctl status rabbitmq-server
- 停止服务
sudo systemctl stop rabbitmq-server
- 设置开机启动
sudo systemctl enable rabbitmq-server
# 四、开启Web管理插件
- 1.开启插件
rabbitmq-plugins enable rabbitmq_management
说明:rabbitmq有一个默认的guest用户,但只能通过localhost访问,所以需要添加一个能够远程访问的用户。
- 2.添加用户
rabbitmqctl add_user admin admin
- 3.为用户分配操作权限
rabbitmqctl set_user_tags admin administrator
- 4.为用户分配资源权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# 五、防火墙添加端口
RabbitMQ 服务启动后,还不能进行外部通信,需要将端口添加都防火墙
- 1.添加端口
sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
2
3
4
- 2.重启防火墙
sudo firewall-cmd --reload
# 多机多节点集群部署
# [一、 环境准备](#一、 环境准备)
- 准备两台安装好RabbitMQ 的机器,安装方法见 安装步骤
- 192.168.0.105
- 192.168.0.106
提示:如果使用虚拟机,可以在一台VM上安装好RabbitMQ后,创建快照,从快照创建链接克隆,会节省很多磁盘空间
# 二、修改配置文件
- 1.修改主机名
修改/etc/hostname 192.168.0.105 修改为 node1 192.168.0.106 修改为 node2
或者 分别在两台主机执行命令
hostnamectl set-hostname node1
hostnamectl set-hostname node2
2
修改完毕后用远程工具重新连接
- 2.修改192.168.0.105机器上的/etc/hosts文件
sudo vim /etc/hosts
- 3.添加IP和节点名
192.168.0.105 node1
192.168.0.106 node2
2
- 4.将192.168.0.105上的hosts文件复制到另外一台机器上
sudo scp /etc/hosts root@node2:/etc/
说明:命令中的root是目标机器的用户名,命令执行后,可能会提示需要输入密码,输入对应用户的密码就行了
5.将192.168.0.105上的/var/lib/rabbitmq/.erlang.cookie文件复制到另外两台机器上
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/
提示:如果是通过克隆的VM,可以省略这一步
# 三、防火墙添加端口
给每台机器的防火墙添加端口
- 1.添加端口
sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
2
3
4
- 2.重启防火墙
sudo firewall-cmd --reload
# 四、启动RabbitMQ
- 1.启动每台机器的RabbitMQ
sudo systemctl start rabbitmq-server
或者
rabbitmq-server -detached
- 2.将192.168.0.105加入到集群
# 停止RabbitMQ 应用
rabbitmqctl stop_app
# 重置RabbitMQ 设置 (可以不操作!!!)
rabbitmqctl reset
# 加入到集群
rabbitmqctl join_cluster rabbit@node1 --ram
# 启动RabbitMQ 应用
rabbitmqctl start_app
2
3
4
5
6
7
8
9
10
11
- 3.查看集群状态,看到running_nodes,[rabbit@node1,rabbit@node2]表示节点启动成功
rabbitmqctl cluster_status
# 单机多节点部署
# 一、环境准备
- 准备一台已经安装好RabbitMQ 的机器,安装方法见 安装步骤
# 二、启动RabbitMQ
- 1.在启动前,先修改RabbitMQ 的默认节点名(非必要),在/etc/rabbitmq/rabbitmq-env.conf增加以下内容
# RabbitMQ 默认节点名,默认是rabbit
NODENAME=rabbit1
2
- 2.RabbitMQ 默认是使用服务的启动的,单机多节点时需要改为手动启动,先停止运行中的RabbitMQ 服务
sudo systemctl stop rabbitmq-server
- 3.启动第一个节点
rabbitmq-server -detached
- 4.启动第二个节点
RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server -detached
2
- 5.将rabbit2加入到集群
# 停止 rabbit2 的应用
rabbitmqctl -n rabbit2 stop_app
# 重置 rabbit2 的设置
rabbitmqctl -n rabbit2 reset
# rabbit2 节点加入到 rabbit1的集群中
rabbitmqctl -n rabbit2 join_cluster rabbit1 --ram
# 启动 rabbit2 节点
rabbitmqctl -n rabbit2 start_app
2
3
4
5
6
7
8
9
10
11
- 6.查看集群状态,看到{running_nodes,[rabbit3@node1,rabbit2@node1,rabbit1@node1]}说明节点已启动成功。
rabbitmqctl cluster_status
提示:在管理界面可以更直观的看到集群信息
# 三、防火墙添加端口
需要将每个节点的端口都添加到防火墙
- 1.添加端口
sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
sudo firewall-cmd --zone=public --add-port=5673/tcp --permanent
sudo firewall-cmd --zone=public --add-port=25673/tcp --permanent
sudo firewall-cmd --zone=public --add-port=15673/tcp --permanent
sudo firewall-cmd --zone=public --add-port=5674/tcp --permanent
sudo firewall-cmd --zone=public --add-port=25674/tcp --permanent
sudo firewall-cmd --zone=public --add-port=15674/tcp --permanent
2
3
4
5
6
7
8
9
10
- 2.重启防火墙
sudo firewall-cmd --reload
# 镜像队列模式集群
- 镜像队列属于RabbitMQ 的高可用方案,见:https://www.rabbitmq.com/ha.html#mirroring-arguments (opens new window)
- 通过前面的步骤搭建的集群属于普通模式集群,是通过共享元数据实现集群
- 开启镜像队列模式需要在管理页面添加策略,添加方式:
进入管理页面 -> Admin -> Policies(在页面右侧)-> Add / update a policy
b.在表单中填入:
name: ha-all Pattern: ^ Apply to: Queues Priority: 0 Definition: ha-mode = all1
2
3
4
5参数说明 name: 策略名称,如果使用已有的名称,保存后将会修改原来的信息 Apply to:策略应用到什么对象上 Pattern:策略应用到对象时,对象名称的匹配规则(正则表达式) Priority:优先级,数值越大,优先级越高,相同优先级取最后一个 Definition:策略定义的类容,对于镜像队列的配置来说,只需要包含3个部分: ha-mode 、ha-params 和 ha-sync-mode。其中,ha-sync-mode是同步的方式,自动还是手动,默认是自动。ha-mode 和 ha-params 组合使用。组合方式如下:
| ha-mode | ha-params | 说明 |
|---|---|---|
| all | (empty) | 队列镜像到集群类所有节点 |
| exactly | count | 队列镜像到集群内指定数量的节点。如果集群内节点数少于此值,队列将会镜像到所有节点。如果大于此值,而且一个包含镜像的节点停止,则新的镜像不会在其它节点创建。 |
| nodes | nodename | 队列镜像到指定节点,指定的节点不在集群中不会报错。当队列申明时,如果指定的节点不在线,则队列会被创建在客户端所连接的节点上。 |
- 镜像队列模式相比较普通模式,镜像模式会占用更多的带宽来进行同步,所以镜像队列的吞吐量会低于普通模式
- 但普通模式不能实现高可用,某个节点挂了后,这个节点上的消息将无法被消费,需要等待节点启动后才能被消费。
# RabbitMQ持久化机制
# RabbitMQ持久化
RabbitMQ的持久化分为队列持久化,消息持久化和交换器持久化,不管是非持久化和持久化的消息都可以被写入磁盘。

- 队列持久化 队列的持久化是在定义队列时的durable参数来实现的,durable为true时,队列才会持久化。
Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //第二个参数设置为true,即durable=true channel.queueDeclare("queue1",true,false,false,null);1
2
3
4
- 消息持久化 消息持久化通过消息的属性deliveryMode来设置是否持久化,在发送消息时通过b asicPublish的参数传入。
//通过传入MessageProperties.PERSISTENT_TEXT_PLAIN就可以实现消息持久化 channel.basicPublish("","queue1",MessageProperties.PERSISTENT_TEXT_PLAIN,persistent_test_message.getBytes());1
2交换器持久化 同队列一样,交换器也需要在定义时设置持久化标识,否则在Broker重启后将丢失
//durable 为true则开启持久化
Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable) throws IOE xception;
2
# RabbitMQ内存控制
内存告警
当内存使用超过配置的阈值或者磁盘剩余空间低于配置的阈值时,RabbitMQ会暂时阻塞客户端的连接,并停止接收从客户端发来的消息,以此避免服务崩溃,客户端与服务端的心跳检测也会失效。

当出现内存告警时,可以通过管理命令临时调整内存大小,也可以通过修改配置文件
- 命令
rabbitmqctl set_vm_memory_high_watermark <fraction>1fraction 为内存闽值,RabbitMQ默认值为0.4表示当RabbitMQ使用的内存超过40%时,就会产生告警并阻塞所有生产者连接。 通过此命令修改的阈值在Broker重启后将会失效,通过修改配置文件的方式设置的阈值则不会在重启后消失,但需要重启Broker才会生效。
- 配置文件 配置文件地址:/etc/rabbitmq/rabbitmq.conf
vm_memory_high_watermark.relative = 0.4 #vm_memory_high_watermark.absolute = 1GB1
2RabbitMQ提供relative或absolute两种配置方式 relative 相对值,即前面的fraction,建议取值在0.4~0.66之间,不建议超过0.7 absolute绝对值,单位为KB、MB、GB,对应的命令是
rabbitmqctl set_vm_memory_high_watermark absolute <value>1
内存换页
在某个Broker节点触及内存并阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间。持久化和非持久化的消息都会被转储到磁盘中,其中持久化的消息本身就在磁盘中有一份副本,这里会将持久化的消息从内存中清除掉。
默认情况下,在内存到达内存阀值的50%时会进行换页动作。 也就是说,在默认的内存阈值为0.4的情况下,当内存超过0.4×0.5=0.2时会进行换页动作。
可以通过在配置文件中配置vm_memory_high_watermark_paging_ratio项来修 改此值
- 以下配置将会在RabbitMQ内存使用率达到30%时进行换页动作,并在40%时阻塞生产者。 当vm_memory_high_watermark_paging_ratio的值大于1时,相当于禁用了换页功能。
vm_memory_high_watermark.relative=0.4 vm_memory_high_watermark_paging_ratio = 0.751
2

# RabbitMQ磁盘控制
- 当磁盘剩余空间低于确定的阀值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务崩渍。
默认情况下,磁盘阀值为50MB,表示当磁盘剩余空间低于50MB时会阻塞生产者并停止内存中消息的换页动作。
这个阈值的设置可以减小,但不能完全消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间检测期间内,磁盘空间从大于50MB被耗尽到0MB。
一个相对谨慎的做法是将磁盘阈值设置为与操作系统所显示的内存大小一致。
通过命令可以临时调整磁盘阀值
rabbitmqctl set_disk_free_limit <disk_limit> rabbitmqctl set_disk_free_limit mem_relative <fraction>1
2- disk_limit为固定大小,单位为KB、MB、GB fraction为相对比值,建议的取值为1.0~2.0之间对应的配置如下:
disk_free_limit.relative=2.0 #disk_free_limit.absolute=50mb1
2
# 消息可靠性和插件化机制
# 消息可靠性
- RabbitMQ的消息可靠性,一般是业务系统接入消息中间件时首要考虑的问题,一般通过三个方面保障
- 发送可靠性:确保消息成功发送到Broker
- 存储可靠性:Broker对消息持久化,确保消息不会丢失
- 消费可靠性:确保消息成功被消费

- **RabbitMQ支持其中的“最多一次”和“最少一次”**其中“最少一次”投递实现需要考虑以下这个几个方面的内容:
- “最少一次”
- 消息生产者需要开启事务机制或者publisher confirm机制,以确保消息可以可靠地传输到RabbitMQ中。
- 消息生产者需要配合使用mandatory参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
- “最多一次”的方式就无须考虑以上那些方面,生产者随意发送,不过这样很难确保消息会成功发送。
- “最少一次”
# 发送可靠性
- 基于mq的分布式事务

# 消费可靠性
- 消费者在消费消息的同时,需要将autoAck设置为false,然后通过手动确认的方式去确认己经正确消费的消息,以免在消费端引起不必要的消息丢失。
- 死信队列

# 插件化机制
- RabbitMQ支持插件,通过插件可以扩展多种核心功能:支持多种协议、系统状态监控、其它AMQP 0-9-1交换类型、节点联合等。许多功能都是通过插件实现的。
- RabbitMQ内置一些插件,通过rabbitmq-plugins list命令可以查看插件列表。
- 通过rabbitmq-plugins命令可以启用或禁用插件
rabbitmq-plugins enable plugin-name
rabbitmq-plugins disable plugin-name
2
- 常用插件
