mq系列rabbitmq-02集群+高可用配置

一。 rabbitmq 集群高可用介绍

   rabbitmq的broker是由一个或者多个物理节点组成 rabbtmq程序之间共享用户 虚拟主机 队列 交换机,绑定和参数,也可以将多个节点

组合成一个集群,所有的数据都会被拷贝到集群中不同的节点 除了队列数据  队列数据可以通过一些策略分配到不同的节点 具体参考高可用队列

映射(http://www.rabbitmq.com/ha.html)

二。 rabbitmq集群

 参考文档

   http://www.rabbitmq.com/clustering.html

   http://www.rabbitmq.com/ha.html

1》集群安装

模拟环境 

192.168.58.149 node2
192.168.58.150 node3
192.168.58.151 node4
rabbitmq集群需要通过主机名互相访问 所以必须在三台机器 /etc/hosts配置以上内容 确保主机名正确 

三台机器使用rabbitmq进行通信 使用的安全策略是 erlang.cookie 就是一个随机字符串 三个必须保持一致 否则 无法添加节点到集群
该cookie文件 可能位于  /var/lib/rabbitmq/.erlang.cookie 或者 ~/.erlang.cookie任何一台机器(比如node1) 启动

 service rabbitmq-server start 
拷贝生成的cookie文件到node2和node3
scp ~/.erlang.cookie root@node3:~/
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/
scp ~/.erlang.cookie root@node4:~/
scp /var/lib/rabbitmq/.erlang.cookie root@node4:/var/lib/rabbitmq/
分别三台机器安装 rabbitmq-server 具体参考(http://blog.csdn.net/liaomin416100569/article/details/78476783)
分别启动三台机器 三台机器都执行

rabbitmq-server -detached
分别查看集群状态 每个机器单独的没有关联起来 所有各自就是各自的集群

node2上(running_nodes表示运行的集群 只有自己)

[root@node2 rabbitmq]# rabbitmqctl cluster_status
Cluster status of node rabbit@node2 ...
[{nodes,[{disc,[rabbit@node2]}]},
 {running_nodes,[rabbit@node2]},
 {cluster_name,<<"rabbit@node2">>},
 {partitions,[]}]
...done.
node3上
[root@node3 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@node3 ...
[{nodes,[{disc,[rabbit@node3]}]},
 {running_nodes,[rabbit@node3]},
 {cluster_name,<<"rabbit@node3">>},
 {partitions,[]}]
...done.
node4上
[root@node4 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@node4 ...
[{nodes,[{disc,[rabbit@node4]}]},
 {running_nodes,[rabbit@node4]},
 {cluster_name,<<"rabbit@node4">>},
 {partitions,[]}]
...done.
创建集群 集群必须有一个中心节点 所有的其他节点会被添加到这个中心节点 比如我这里是node2 集群有个集群名称 如果三个节点被添加到一个集群中 集群名称必须是相同 默认的名字是rabbit 比如node2 一般在集群中的集群名 就是 集群名@机器名 node2就是rabbmit@node2将node3和node4 停止应用 添加到node2集群中 重启服务
[root@node3 ~]# rabbitmqctl stop_app
Stopping node rabbit@node3 ...
...done.
[root@node3 ~]# rabbitmqctl join_cluster rabbit@node2
Clustering node rabbit@node3 with rabbit@node2 ...
...done.
[root@node3 ~]# rabbitmqctl start_app
Starting node rabbit@node3 ...
...done.
任何一台机器查看集群状态 发现三台集群都加入了运行集群中
[root@node3 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@node3 ...
[{nodes,[{disc,[rabbit@node2,rabbit@node3,rabbit@node4]}]},
 {running_nodes,[rabbit@node2,rabbit@node4,rabbit@node3]},
 {cluster_name,<<"rabbit@node2">>},
 {partitions,[]}]
...done.

2》集群维护

 》》 集群重置

   如果需要将某个节点从集群中移除 可以在当前节点 使用reset命令

 比如将node3节点移除(移除后集群状态运行节点就只有自己了)

[root@node3 ~]# rabbitmqctl stop_app
Stopping node rabbit@node3 ...
...done.
[root@node3 ~]# rabbitmqctl reset
Resetting node rabbit@node3 ...
...done.
[root@node3 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@node3 ...
[{nodes,[{disc,[rabbit@node3]}]}]
...done.
如果某个中心节点被重置了 有可能一些其他节点无法重置可以使用

rabbitmqctl force_reset

rabbitmq支持远程重置集群 比如

假设 node2 网络中断 (模拟 在node2上 执行 service network stop) 

node3执行

rabbitmqctl cluster_status
执行很长时间后 自动将集群名称 命名为  rabbitmq@node3 

》》其他维护命令 

  参考 (http://www.rabbitmq.com/cli.html  http://www.rabbitmq.com/man/rabbitmqctl.1.man.html)

三。 rabbitmq高可用

 rabbitmq的交换器等数据在所有集群节点中共享比如在node2webgui上添加一个交换器test_ex


node3的webgui上查看 发现多了test_ex交换器


试着添加用户 bingding等 都会自动同步

但是当你试着去添加队列时 发现添加界面有个选择项 将队列信息添加到哪个节点上


添加后虽然你使用node3web查看也能看到该队列但是 该队列的数据是被存储在node2节点上

rabbitmq的高可用 也就是说 队列以及队列消息信息的高可用 防止一台机器宕机后 队列消息的丢失

rabbitmq支持使用镜像队列(主从)模式将队列通过某种策略匹配到不同的集群节点上 交换器和绑定根据这些规则来获取到对应的队列

每个队列都有主和从机(多个) 主负责读写 从用于备份

支持三种镜像复制模式

ha-mode

ha-params

Result

all

(absent)

队列映射到所有节点 新节点加入也会被映射

exactly

count

集群中队列的从机个数. 1就意味着没从机 如果唯一节点挂了 意味着队列无法访问了,

数量 2意味 1主1从 如果主挂了 从自动接替称为主,, 从机个数 = 配置参数- 1.

如果配置参数超过了集群总节点数 默认是all 映射到所有节点,如果指定数量节点挂了

其他节点自动成为它的从节点.

nodes

node names

指定由哪些节点称为 从节点 比如linux rabbitmqctl set_policy ha-nodes "^nodes."
   '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

通过命令  rabbitmqctl set_policy 设置队列复制的逻辑

该命令语法
rabbitmqctl set_policy ha-nodes "^nodes."
   '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

set_policy 策略名称 匹配队列名的正则表达式  被复制的方式(上面的ha-mode和参数)

以上三种模式举例比如:

所有名字ha开头的队列都复制到所有的集群主机

rabbitmqctl set_policy ha-all "^ha." '{"ha-mode":"all"}'
所有名字 two.开头的队列随机复制到任意一台集群主机 数量应该是2 还有一个主机

rabbitmqctl set_policy ha-two "^two." 
   '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
所有nodes开头的队列都复制到node2和node3

rabbitmqctl set_policy ha-nodes "^nodes." 
   '{"ha-mode":"nodes","ha-params":["rabbit@node2", "rabbit@node3"]}'

四。 rabbitmqapi测试集群和高可用
使用高可用two.开头的自动复制到两个节点测试

执行

rabbitmqctl set_policy ha-two "^two." 
   '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

使用简单模式发布一个消息
Pub.java代码

package cn.et.p6;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息发送者 - 生产者
 * @author jiaozi
 *
 */
public class Pub {
	/**
	 * 队列名称 应该以two.开头
	 */
	private final static String QUEUE_NAME = "two.hello";
	public static void main(String[] args) throws Exception {
		//连接远程rabbit-server服务器
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.58.150");
		factory.setPort(5672);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		//定义创建一个队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		String message = "Hello World!";
		//发送消息
		channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");
		channel.close();
		connection.close();
	}

}
因为连接的 150(node3)所有 150是主机 刚好队列名称 是two.开头 所以应该有个从机被复制 查看队列消息


点击队列名称进入可以看到从节点 到底是哪台


这里 如果挂掉主节点node3  发现node2自动接管成为主节点

这里消息接受者 无论是从 node2,node3,node4都是可以消费的 

消费者测试

package cn.et.p6;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 消息接受者 - 消费者
 * 
 * @author jiaozi
 *
 */
public class Rec {
	/**
	 * 获取消息队列名称
	 */
	private final static String QUEUE_NAME = "two.hello";
	/**
	 * 异步接收
	 * @throws Exception 
	 */
	public static void asyncRec() throws Exception{
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.58.149");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		//消费者也需要定义队列 有可能消费者先于生产者启动 
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		//定义回调抓取消息
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
					byte[] body) throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println(" [x] Received '" + message + "'");
			}
		};
		channel.basicConsume(QUEUE_NAME, true, consumer);
	}
	public static void main(String[] args) throws Exception {
		asyncRec();
	}
}
如果 150挂掉 代码中需要重连其他的机器 所以需要定义集群集群中所有机器 这里生产者和消费者连接代码相同 只贴出生产者

package cn.et.p6;

import java.io.IOException;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息发送者 - 生产者
 * @author jiaozi
 *
 */
public class Pub {
	static Address[] addresses=new Address[]{
			new Address("192.168.58.149"),
			new Address("192.168.58.150"),	
			new Address("192.168.58.151")	
		};
	/**
	 * 队列名称 应该以two.开头
	 */
	private final static String QUEUE_NAME = "two.hello";
	public static void main(String[] args) throws Exception {
		//连接远程rabbit-server服务器
		ConnectionFactory factory = new ConnectionFactory();
		factory.setPort(5672);
		//默认自动恢复重连 http://www.rabbitmq.com/api-guide.html#recovery
		factory.setAutomaticRecoveryEnabled(true);
		//连接不上之后 多少毫秒重连
		factory.setNetworkRecoveryInterval(10000);
		//不会负载均衡只会抓取第一个可用的连接 
		Connection connection = factory.newConnection(addresses);
		Channel channel = connection.createChannel();
		//定义创建一个队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		String message = "Hello World!";
		//发送消息
		channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");
		channel.close();
		connection.close();
	}

}
默认只是抓取可以连接的第一个服务器 默认可以自己实现一个客户端负责均衡器  弄成rr轮询(连不上就轮询下一个 重写该类 重写newConnection方法)


这里我就懒得去实现了  也可以使用第三方反向代理来负载均衡 比如nginx或者haproxy或者lvs+keepalived




原文地址:https://www.cnblogs.com/liaomin416100569/p/9331167.html