RabbitMQ

RabbitMQ集群部署篇

阅读(738)

一:本文在上一篇<<消息中间件之一:RabbitMQ基础原理篇>> 的基础之上继续对rabbitMQ完成安装部署,命令介绍,rabbitMQ的集群部署,以及通过使用keepalived+haproxy作为反向代理访问rabbitMQ,从而实现负载均衡及高可用,具体配置如下:

192.168.10.101   rabbitmq-server1
192.168.10.102   rabbitmq-server2
192.168.10.103   rabbitmq-server3

1.1.2:各rabbitMQ服务器更改hosts文件:

[root@rabbitmq-server1 ~]# vim /etc/hosts
192.168.10.101   rabbitmq-server1
192.168.10.102   rabbitmq-server2
192.168.10.103   rabbitmq-server3

1.1.3:各rabbitMQ服务器安装并启动rabbitMQ服务:

[root@rabbitmq-server1 ~]# yum  install  rabbitmq-server
[root@rabbitmq-server1 ~]# systemctl  start rabbitmq-server
[root@rabbitmq-server1 ~]# rabbitmq-plugins enable rabbitmq_management #开启web管理界面插件
[root@rabbitmq-server1 ~]# systemctl   restart rabbitmq-server #重启rabbitMQ服务

1.1.4:常用命令:

[root@rabbitmq-server1 ~]# rabbitmqctl  add_vhost xxx #创建vhost
Creating vhost "xxx" ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl  list_vhosts #遍历所有vhost信息
Listing vhosts ...
/
xxx
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl  delete_vhost xxx #删除vhost
Deleting vhost "xxx" ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl  add_user jack 123456 #添加用户jack密码为123456
Creating user "jack" ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl  change_password jack 654321 #更改jack用户的密码为654321
Changing password for user "jack" ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl  set_permissions -p xxx jack ".*" ".*" ".*" #绑定jack用户对xxx的vhost的读写权限
Setting permissions for user "jack" in vhost "xxx" ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl  list_queues #列出所有队列
Listing queues ...
...done.

1.1.5:访问web界面:默认用户名为guest密码为guest

二: 部署rabbitMQ集群:

2.1:Rabbitmq的集群是依赖于erlang的集群来工作的,所以必须先构建起erlang的集群环境。而Erlang的集群中各节点是通过一个magic cookie来实现的,这个cookie存放在 /var/lib/rabbitmq/.erlang.cookie 中,文件是400的权限。所以必须保证各节点cookie保持一致,否则节点之间就无法通信。

[root@rabbitmq-server1 ~]# scp /var/lib/rabbitmq/.erlang.cookie  192.168.10.102:/var/lib/rabbitmq/.erlang.cookie 
root@192.168.10.102's password: 
.erlang.cookie                                                  100%   20     0.0KB/s   00:00    
[root@rabbitmq-server1 ~]# scp /var/lib/rabbitmq/.erlang.cookie  192.168.10.103:/var/lib/rabbitmq/.erlang.cookie 
root@192.168.10.103's password: 
.erlang.cookie                                                  100%   20     0.0KB/s   00:00

2.2:各节点重启rabbitMQ服务并验证端口:

2.2.1:各节点启动服务:

[root@rabbitmq-server1 ~]# systemctl  restart rabbitmq-server
[root@rabbitmq-server2 ~]# systemctl  restart rabbitmq-server
[root@rabbitmq-server3 ~]# systemctl  restart rabbitmq-server

2.2.2:验证端口:

5672:消费者访问的 端口
15672web管理端口
25672:集群状态通信端口

2.3:停止所有节点RabbitMq服务,然后使用detached参数独立运行,这步很关键,尤其增加节点停止节点后再次启动遇到无法启动都可以参照这个顺序

[root@rabbitmq-server1 ~]# systemctl  stop  rabbitmq-server
[root@rabbitmq-server2 ~]# systemctl  stop  rabbitmq-server
[root@rabbitmq-server3 ~]# systemctl  stop  rabbitmq-server
[root@rabbitmq-server1 ~]# rabbitmq-server -detached
[root@rabbitmq-server2 ~]# rabbitmq-server -detached
[root@rabbitmq-server3 ~]# rabbitmq-server -detached

2.4:查看各rabbitMQ服务的集群状态:

[root@rabbitmq-server1 ~]# rabbitmqctl  cluster_status
Cluster status of node 'rabbit@rabbitmq-server1' ...
[{nodes,[{disc,['rabbit@rabbitmq-server1']}]},
 {running_nodes,['rabbit@rabbitmq-server1']},
 {cluster_name,<<"rabbit@rabbitmq-server1">>},
 {partitions,[]}]
...done.

[root@rabbitmq-server2 ~]# rabbitmqctl  cluster_status
Cluster status of node 'rabbit@rabbitmq-server2' ...
[{nodes,[{disc,['rabbit@rabbitmq-server2']}]},
 {running_nodes,['rabbit@rabbitmq-server2']},
 {cluster_name,<<"rabbit@rabbitmq-server2">>},
 {partitions,[]}]
...done.

[root@rabbitmq-server3 rabbitmq]# rabbitmqctl  cluster_status
Cluster status of node 'rabbit@rabbitmq-server3' ...
[{nodes,[{disc,['rabbit@rabbitmq-server3']}]},
 {running_nodes,['rabbit@rabbitmq-server3']},
 {cluster_name,<<"rabbit@rabbitmq-server3">>},
 {partitions,[]}]
...done.

2.5:将rabbitMQ的节点添加到集群:

2.5.1:在rabbitmq-server1作为内存节点连接起来,并作为内存节点,在rabbitmq-server1执行以下命令:

[root@rabbitmq-server1 ~]# rabbitmqctl  cluster_status
Cluster status of node 'rabbit@rabbitmq-server1' ...
[{nodes,[{disc,['rabbit@rabbitmq-server1']}]},
 {running_nodes,['rabbit@rabbitmq-server1']},
 {cluster_name,<<"rabbit@rabbitmq-server1">>}, #未添加到集群之前只有自己一台节点
 {partitions,[]}]
...done. 

[root@rabbitmq-server1 ~]# rabbitmqctl  stop_app #停止应程序
[root@rabbitmq-server1 ~]# rabbitmqctl   reset #清空元数据

[root@rabbitmq-server1 ~]# rabbitmqctl  join_cluster rabbit@rabbitmq-server2 --ram #将rabbitmq-server1添加到集群当中,并成为内存节点,不加--ram默认是磁盘节点
Clustering node 'rabbit@rabbitmq-server1' with 'rabbit@rabbitmq-server2' ...
...done.

[root@rabbitmq-server1 ~]# rabbitmqctl   cluster_status
Cluster status of node 'rabbit@rabbitmq-server1' ...
[{nodes,[{disc,['rabbit@rabbitmq-server2']}, #默认是磁盘节点
         {ram,['rabbit@rabbitmq-server1']}]}]  #内存节点
...done.

[root@rabbitmq-server1 ~]# rabbitmqctl  start_app #不要忘记启动应用程序
Starting node 'rabbit@rabbitmq-server1' ...
...done.
[root@rabbitmq-server1 ~]# rabbitmqctl   cluster_status #添加之后的集群状态
Cluster status of node 'rabbit@rabbitmq-server1' ...
[{nodes,[{disc,['rabbit@rabbitmq-server2']}, #默认的是磁盘节点
         {ram,['rabbit@rabbitmq-server1']}]}]  #自己被添加为内存节点
...done.

2.5.2:将rabbitmq-server3添加到集群:

[root@rabbitmq-server3 rabbitmq]# rabbitmqctl  cluster_status #未添加到集群之后只有自己一个节点
Cluster status of node 'rabbit@rabbitmq-server3' ...
[{nodes,[{disc,['rabbit@rabbitmq-server3']}]},
 {running_nodes,['rabbit@rabbitmq-server3']},
 {cluster_name,<<"rabbit@rabbitmq-server3">>},
 {partitions,[]}]
...done.
[root@rabbitmq-server3 rabbitmq]# rabbitmqctl  stop_app #停止应用程序
Stopping node 'rabbit@rabbitmq-server3' ...
...done.
[root@rabbitmq-server3 rabbitmq]# rabbitmqctl  reset  #重设元数据
Resetting node 'rabbit@rabbitmq-server3' ...
...done.
[root@rabbitmq-server3 rabbitmq]#  rabbitmqctl  join_cluster rabbit@rabbitmq-server2 --ram #添加到集群当中并设置为内存节点
Clustering node 'rabbit@rabbitmq-server3' with 'rabbit@rabbitmq-server2' ...
...done.
[root@rabbitmq-server3 rabbitmq]# rabbitmqctl  start_app #驱动应用程序
Starting node 'rabbit@rabbitmq-server3' ...
...done.
[root@rabbitmq-server3 rabbitmq]# rabbitmqctl  cluster_status #查看集群状态
Cluster status of node 'rabbit@rabbitmq-server3' ...
[{nodes,[{disc,['rabbit@rabbitmq-server2']},  #一个磁盘节点,集群当中最少有一个磁盘节点用于消息的持久化
         {ram,['rabbit@rabbitmq-server3','rabbit@rabbitmq-server1']}]}, #两个内存节点
 {running_nodes,['rabbit@rabbitmq-server1','rabbit@rabbitmq-server2', #当前正在运行的节点
                 'rabbit@rabbitmq-server3']},
 {cluster_name,<<"rabbit@rabbitmq-server2">>}, #集群名称
 {partitions,[]}]
...done.

2.6:更改为镜像模式:
2.6.1:只要在其中一台节点执行以下命令即可:

[root@rabbitmq-server1 ~]# rabbitmqctl set_policy  ha-all "#"  '{"ha-mode":"all"}' #"#"为任意0个或多个即为所有,也可以使用"^test"匹配开头,还可以使用其他正则匹配
Setting policy "ha-all" for pattern "#" to "{"ha-mode":"all"}" with priority "0" ...
...done.

2.6.2:在任意一台节点的管理界面创建一个queue,验证是可以同步到其他节点:

2.6.3:在其他rabbitMQ节点进行验证queue是否存在:

2.6.4:使用命令也可以验证:

[root@rabbitmq-server1 ~]# rabbitmqctl  list_queues
Listing queues ...
test	0
test1	0
test2	0
...done.

2.7:python 操作rabbitMQ:

2.7.1:安装pika模块:

# pip3 install pika
# easy_install pika 

2.7.2:生产者代码:

#!/bin/env  python
#coding:utf-8
#Author: ZhangJie
import  pika
#用户名密码
cert = pika.PlainCredentials("guest","guest")
#连接到rabbitMQ服务器
conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.10.101",5672,'/',cert))
#创建频道
chanel = conn.channel()
#声明消息队列,如果队列不存在就创建,存在就将消息在此队队列中创建,如果将消息发送到不存在的队列,则rabbitMQ会自动清除这些消息
chanel.queue_declare(queue="hello")
#exchange告诉消息去往的队列,routing_key是队列名,body是要传递的消息内容
chanel.basic_publish(exchange="",
                     routing_key="hello",
                     body="hello world!")
print("开始队列")
#消息写入完成,关闭连接
conn.close()

2.7.3:消费者代码:

#!/bin/env  python
#coding:utf-8
#Author: ZhangJie
import  pika
#用户名密码
cert = pika.PlainCredentials("guest","guest")
#连接到rabbitMQ服务器
conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.10.101",5672,'/',cert))
#创建频道
chanel = conn.channel()
#声明消息队列,如果队列不存在就创建,存在就将消息在此队队列中创建,如果将消息发送到不存在的队列,则rabbitMQ会自动清除这些消息
chanel.queue_declare(queue="hello")
#exchange告诉消息去往的队列,routing_key是队列名,body是要传递的消息内容
chanel.basic_publish(exchange="",
                     routing_key="hello",
                     body="hello world1!")
print("开始队列")
#消息写入完成,关闭连接
conn.close()

2.7.4:执行结果:

2.7.5:最终的queue截图:

三:通过haproxy 实现反向代理:

3.1:haproxy安装过程省略。主要配置如下:

[root@rabbitmq-server3 ~]# cat  /etc/haproxy/haproxy.cfg 
global
maxconn 100000
uid 99
gid 99
daemon
nbproc 1
log 127.0.0.1 local0 info

defaults
option http-keep-alive
#option  forwardfor
maxconn 100000
mode tcp
timeout connect 300000ms
timeout client  300000ms
timeout server  300000ms

listen rabbitmq 0.0.0.0:5671
mode tcp
log global
balance roundrobin
server rabbitmq1 192.168.10.101:5672 check inter 2000 rise 3 fall 3
server rabbitmq2 192.168.10.102:5672 check inter 2000 rise 3 fall 3
server rabbitmq3 192.168.10.103:5672 check inter 2000 rise 3 fall 3

listen rabbitmq_cluster 0.0.0.0:15671
mode tcp
log global
balance roundrobin
server rqslave1 192.168.10.101:15672 check inter 2000 rise 3 fall 3
server rqslave2 192.168.10.102:15672 check inter 2000 rise 3 fall 3
server rqslave3 192.168.10.102:15672 check inter 2000 rise 3 fall 3

3.2:访问5671端口,验证可以访问到后端真实的rabbitMQ服务器:

3.3:使用python通过haproxy向rabbitMQ写入消息:

3.3.1:写入消息的的生产者代码:

#!/bin/env  python
#coding:utf-8
#Author: ZhangJie
import  pika
#用户名密码
cert = pika.PlainCredentials("guest","guest")
#连接到rabbitMQ服务器
conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.10.103",5671,'/',cert))
#创建频道
chanel = conn.channel()
#声明消息队列,如果队列不存在就创建,存在就将消息在此队队列中创建,如果将消息发送到不存在的队列,则rabbitMQ会自动清除这些消息
chanel.queue_declare(queue="hello")
#exchange告诉消息去往的队列,routing_key是队列名,body是要传递的消息内容
chanel.basic_publish(exchange="",
                     routing_key="test2",
                     body="hello world1!")
print("消息写入成功!")
#消息写入完成,关闭连接
conn.close()

3.3.2:执行结果:

3.4:通过haproxy反向代理想rabbitMQ服务器获取消息:

3.4.1:获取消息的消费者脚本内容:

#!/bin/env  python
#coding:utf-8
#Author: ZhangJie
import  pika
#用户名
cert = pika.PlainCredentials("guest","guest")
#连接到服务器
conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.10.103",5671,"/",cert))
#创建频道
channel = conn.channel()
#声明消息队列,如果不存在就创建
channel.queue_declare(queue="hello")
# 定义一个回调函数来处理,这边的回调函数就是将信息打印出来。
def callback(ch,method,properties,body):
    print("[x] Received %r" % body)
channel.basic_consume(callback,
                      queue="test2",
                      no_ack=True) # no_ack=True表示在回调函数中不需要发送确认标识
print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
channel.start_consuming()

3.4.2:执行结果:

原文地址:https://www.cnblogs.com/you0329/p/8591471.html