12.RabbitMQ多机集群

配置两台Linux CentOS 6.7虚拟主机

CentOS6.7下载地址
https://pan.baidu.com/s/1i5GPg9n
 
安装视频下载

https://pan.baidu.com/s/1qYSgohQ
 

rabbitmq2
12.RabbitMQ多机群集
rabbitmq1
12.RabbitMQ多机群集


1、分别在两台主机上修改/etc/hosts
192.168.169.100 rabbitmq1
192.168.169.110 rabbitmq2
 
2、从客户端上传RPM包
12.RabbitMQ多机群集
RPM包下载地址
https://pan.baidu.com/s/1dE1iaGx
 

3、下载阿里云Yum源

#wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo

依次安装Erlang,Rabbitmq
#yum -y install openssl
#yum -y install socat-1.7.2.4-1.el6.rf.x86_64.rpm
#yum -y install erlang-17.4-1.el6.x86_64.rpm
#yum -y install rabbitmq-server-3.6.3-1.noarch.rpm

4、启动rabbitmq1,rabbitmq2上的RabbitMQ
rabbitmq1
#service rabbitmq-server start
rabbitmq2
#service rabbitmq-server start

5、从rabbitmq1主机上拷贝文件到rabbitmq2
scp /var/lib/rabbitmq/.erlang.cookie  rabbitmq2:/var/lib/rabbitmq
6、在rabbitmq1,rabbitmq2上分别关闭防火墙
[root@rabbitmq1 ~]# service iptables stop
[root@rabbitmq2 ~]# service iptables stop

7、在rabbitmq1,rabbitmq2上分别启动RibbitMQ
[root@rabbitmq1 ~]# service rabbitmq-server start
[root@rabbitmq2 ~]# service rabbitmq-server start
 
8、在rabbitmq2上执行
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app
9、查看各节点上的状态
rabbitmqctl cluster_status
 
10、在rabbitmq1,rabbitmq2节点上分别添加用户和设置控制台插件
 
[root@rabbitmq1 ~]# rabbitmq-plugins enable rabbitmq_management
 
[root@rabbitmq1 ~]# rabbitmqctl add_user admin admin
 
[root@rabbitmq1 ~]# rabbitmqctl set_permissions admin ".*" ".*" ".*"
 
[root@rabbitmq1 ~]# rabbitmqctl set_user_tags admin administrator
 
[root@rabbitmq2 ~]# rabbitmq-plugins enable rabbitmq_management
 
 

11、在rabbitmq1节点上安装haproxy

yum -y install haproxy
12、配置haproxy
cp /etc/haproxy/haproxy.cfg  /etc/haproxy/haproxy.cfg.bak
vi /etc/haproxy/haproxy.cfg
添加配置信息
12.RabbitMQ多机群集

listen rabbitmq_local_cluster 192.168.169.100:5670 
mode tcp 
balance roundrobin 
server rabbit 192.168.169.100:5672 check inter 5000 rise 2 fall 3
server rabbit 192.168.169.110:5672 check inter 5000 rise 2 fall 3 

listen private_monitoring :8100 
mode http 
option httplog 
stats enable 
stats uri /stats 
stats refresh 60s

13、启动haproxy

service haproxy start

14、查看haproxy控制台
http://192.168.169.142:8100/stats
12.RabbitMQ多机群集
15、建立RabbitMQ策略
12.RabbitMQ多机群集

16、建立持久队列
 
12.RabbitMQ多机群集
 
测试代码
Producer.java
package com.test.cluster;
 
import com.rabbitmq.client.*;
 
import java.io.IOException;
import java.lang.String;
import java.lang.System;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
 
public class Producer {
 
    public static void main(String[] args) throws Exception {
   
    //使用默认端口连接MQ
        ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("admin");
    factory.setPassword("admin");
        factory.setHost("192.168.169.100"); //使用默认端口5672
        factory.setPort(5670);
        Connection conn = factory.newConnection(); //声明一个连接
        Channel channel = conn.createChannel(); //声明消息通道
   
        String exchangeName = "TestEXG";//交换机名称
        String routingKey = "RouteKey1";//RoutingKey关键字
        channel.exchangeDeclare(exchangeName, "direct", true);//定义声明交换机
        String queueName = "ClusterQueue";//队列名称
        Map arg = new HashMap();
        arg.put("x-ha-policy", "all");
        channel.queueDeclare(queueName, false, false, false, arg);
 
        channel.queueBind(queueName, exchangeName, routingKey);//定义声明对象
        
        byte[] messageBodyBytes = "Hello, world!".getBytes();//消息内容
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//发布消息
        //关闭通道和连接
channel.close();
conn.close();
    }
 
}
 
 
Customer.java
 
package com.test.cluster;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
 
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
 
//通过channel.basicAck向服务器发送回执,删除服务上的消息
public class Consumer {
 
    public static void main(String[] args) throws IOException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("admin");
    factory.setPassword("admin");
        factory.setHost("192.168.169.100"); //使用默认端口5672
        factory.setPort(5670);
        Connection conn = factory.newConnection(); //声明一个连接
        Channel channel = conn.createChannel(); //声明消息通道
        String exchangeName = "TestEXG";//交换机名称
        String queueName = "ClusterQueue";//队列名称
        channel.exchangeDeclare(exchangeName, "direct", true);//定义声明交换机
        channel.queueBind(queueName, exchangeName, "RouteKey1");
 
        channel.basicQos(1); //server push消息时的队列长度
 
        //用来缓存服务器推送过来的消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
 
        channel.basicConsume(queueName, false, consumer);
 
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println("Received " + new String(delivery.getBody()));
 
            //回复ack包,如果不回复,消息不会在服务器删除
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
 
关闭掉其中一个RabbitMQ,测试群集效果
 
原文地址:https://www.cnblogs.com/zzpblogs/p/8168828.html