RocketMQ集群搭建(双主双从)

测试机器有限, 使用2台机器搭建集群; 192.168.8.113和192.168.8.114

nameServ1 注册中心 192.168.8.113:9876
nameServ1 注册中心 192.168.8.114:9876
broker-a broker-a --master 192.168.8.113:10911     broker-a.properties
broker-b-s broker-b --slave 192.168.8.113:10912     broker-b-s.properties
broker-b broker-b --master 192.168.8.114:10911     broker-b.properties
broker-a-s broker-a --slave 192.168.8.114:10912     broker-a-s.properties
 

1、下载Rocketmq

下载rocketmq-all-4.5.2-bin-release.zip,并解压到/usr/local  重命名为rocketmq

https://rocketmq.apache.org/release_notes/release-notes-4.5.2/

rocketmq是java编写,所以要配置jdk

2、创建目录

2台机器上执行以下命令创建目录

mkdir /usr/local/rocketmq/store

mkdir /usr/local/rocketmq/store/commitlog

mkdir /usr/local/rocketmq/store/consumequeue

mkdir /usr/local/rocketmq/store/index

mkdir /usr/local/rocketmq/logs

mkdir /usr/local/rocketmq/store2

mkdir /usr/local/rocketmq/store2/commitlog

mkdir /usr/local/rocketmq/store2/consumequeue

mkdir /usr/local/rocketmq/store2/index

3、修改配置文件

cd  conf  

cp  -r  2m-2s-async   myconf

broker-a.properties

brokerClusterName=DefaultCluster
brokerName=broker-a
#集群中 0 表示 Master,>0 表示 Slave
brokerId=0
brokerRole=ASYNC_MASTER
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

#指定broker的IP
brokerIP1=192.168.8.113
#nameServer地址,集群用分号分割
namesrvAddr=192.168.8.113:9876;192.168.8.114:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store

#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#checkTransactionMessageEnable=false
#发消息线程池数量
sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessaeThreadPoolNums=128

#发送消息是否使用可重入锁
useReentrantLockWhenPutMessage=true
waitTimeMillsInSendQueue=300  #或者更大

broker-b-s.properties

brokerClusterName=DefaultCluster
brokerName=broker-b
#集群中 0 表示 Master,>0 表示 Slave
brokerId=1
brokerRole=SLAVE
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

#指定broker的IP
brokerIP1=192.168.8.113
#nameServer地址,集群用分号分割
namesrvAddr=192.168.8.113:9876;192.168.8.114:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#Broker 对外服务的监听端口
listenPort=10912
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store2
#commitLog 存储路径

storePathCommitLog=/usr/local/rocketmq/store2/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store2/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store2/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store2/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store2/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#checkTransactionMessageEnable=false
#发消息线程池数量
sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessaeThreadPoolNums=128

#发送消息是否使用可重入锁
useReentrantLockWhenPutMessage=true
waitTimeMillsInSendQueue=300  #或者更大

broker-b.properties

brokerClusterName=DefaultCluster
brokerName=broker-b
#集群中 0 表示 Master,>0 表示 Slave
brokerId=0
brokerRole=ASYNC_MASTER
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

#指定broker的IP
brokerIP1=192.168.8.114
#nameServer地址,集群用分号分割
namesrvAddr=192.168.8.113:9876;192.168.8.114:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径

storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#checkTransactionMessageEnable=false
#发消息线程池数量
sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessaeThreadPoolNums=128

#发送消息是否使用可重入锁
useReentrantLockWhenPutMessage=true
waitTimeMillsInSendQueue=300  #或者更大

broker-a-s.properties

brokerClusterName=DefaultCluster
brokerName=broker-a
#集群中 0 表示 Master,>0 表示 Slave
brokerId=1
brokerRole=SLAVE
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

#指定broker的IP
brokerIP1=192.168.8.114
#nameServer地址,集群用分号分割
namesrvAddr=192.168.8.113:9876;192.168.8.114:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#Broker 对外服务的监听端口
listenPort=10912
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store2
#commitLog 存储路径

storePathCommitLog=/usr/local/rocketmq/store2/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store2/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store2/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store2/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store2/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#checkTransactionMessageEnable=false
#发消息线程池数量
sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessaeThreadPoolNums=128

#发送消息是否使用可重入锁
useReentrantLockWhenPutMessage=true
waitTimeMillsInSendQueue=300  #或者更大

注意: 上面配置文件中    【#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=false】 将topic和订阅组全部关闭自动创建了。  所以在使用Java-API调用之前,一定要手动创建Topic和消费者订阅组,不然消费者端无法获取到消息。 

sh ./bin/mqadmin updateTopic -t myTopic -c DefaultCluster -n "192.168.8.113:9876;192.168.8.114:9876"

sh bin/mqadmin updateSubGroup -c DefaultCluster -g customerGroup -n "192.168.8.113:9876;192.168.8.114:9876"

创建主题:myTopic    ,    订阅组: customerGroup

4、修改rocketmq启动脚本

适当修改jvm内存大小

vim /usr/local/rocketmq/bin/runbroker.sh

vim /usr/local/rocketmq/bin/runserver.sh

vim /usr/local/rocketmq/bin/tools.sh

.

5、启动注册中心nameSrv

    2台机器都启动nameSrv

#提前创建好目录

nohup sh bin/mqnamesrv > ./logs/namesrv.log 2>&1 &

6、启动broker

192.168.8.113 执行: 

nohup sh bin/mqbroker -c conf/myconf/broker-a.properties -n "192.168.8.113:9876;192.168.8.114:9876" > ./logs/broker-a.log 2>&1 &

nohup sh bin/mqbroker -c conf/myconf/broker-b-s.properties -n "192.168.8.113:9876;192.168.8.114:9876" > ./logs/broker-b-s.log 2>&1 &

192.168.8.114执行:

nohup sh bin/mqbroker -c conf/myconf/broker-b.properties -n "192.168.8.113:9876;192.168.8.114:9876" > ./logs/broker-b.log 2>&1 &

nohup sh bin/mqbroker -c conf/myconf/broker-a-s.properties -n "192.168.8.113:9876;192.168.8.114:9876" > ./logs/broker-a-s.log 2>&1 &

7、查看是否启动

可以查看启动日志
使用jps  , 正常情况可以看到 2个broker和1个namesrv 
常规查看进程方式:ps -ef|grep java   


 

8、创建topic   和  订阅组

updateTopic:该命令执行会在broker所在机器创建一个新的topic,若topic已存在,则会更新topic的属性

sh ./bin/mqadmin updateTopic -t myTopic -c DefaultCluster -n "192.168.8.113:9876;192.168.8.114:9876"

创建topic时使用-b参数指定broker的地址,可以指定在哪个broker上创建。如果使用-c参数指定集群名称,则可以为该集群上的每一个broker都创建一份topic信息。建议使用-c指定集群名称,减少分别在不同的broker上创建手误导致topic属性不同的概率,如果是对集群扩容,则可以通过指定新的broker地址在扩容的机器上创建一份新的topic信息

使用该命令创建topic的读写队列数默认为8,可以通过-r -w指定topic的读写队列数。注意:该命令创建topic的默认队列数无法通过任何配置更改,除非修改源码。

其它参数说明查看--help

deleteTopic:从Broker和Name Server删除Topic

sh ./bin/mqadmin   deleteTopic   -t   myTopic   -c   DefaultCluster   -n   "192.168.8.113:9876;192.168.8.114:9876"

该命令执行完成后会将指定集群下的所有broker节点的topic信息删除,并清除指定地址的name server上该topic的路由信息。所以name server是一个集群的话,请指定集群地址,否则未指定的name server的topic路由信息可能经过broker一个心跳时间后清除。未被清除的那段时间内,生产者依然可以从name server上获取到topic路由信息,正常发送消息。但是发送过程中不会有异常,broker接收到消息处理的时候才会失败,并将结果响应给客户端。

updateSubGroup:该命令执行会在broker所在机器创建一个新的订阅组,若订阅组已存在,则会更新订阅组的属性

sh bin/mqadmin updateSubGroup -c DefaultCluster -g customerGroup -n "192.168.8.113:9876;192.168.8.114:9876"

创建订阅组时使用-b参数指定broker的地址,可以指定在哪个broker上创建。如果使用-c参数指定集群名称,则可以为该集群上的每一个broker都创建一份新的订阅组信息。建议使用-c指定集群名称,减少分别在不同的broker上创建订阅组时手误导致每个机器上的订阅组属性不同的概率,如果是对集群扩容,则可以通过指定新的broker地址在扩容的机器上创建一份新的订阅组信息

不指定消费模型时,默认为集群消费

创建该订阅组时并不会同时创建重试topic,但是该订阅组第一次订阅topic成功时,会创建一个重试topic

其它参数说明使用--help查看


 

deleteSubGroup:从Broker删除订阅组

sh bin/mqadmin  deleteSubGroup  -g  customerGroup  -c  DefaultCluster  -n  "192.168.8.113:9876;192.168.8.114:9876"

将指定订阅组从broker删除,同时将使用该订阅组名称创建的重试topic及死信topic信息统统清除

9、RocketMQ--JavaAPI

使用前创建Topic和订阅组

sh ./bin/mqadmin updateTopic -t myTopic -c DefaultCluster -n "192.168.8.113:9876;192.168.8.114:9876"

sh bin/mqadmin updateSubGroup -c DefaultCluster -g customerGroup -n "192.168.8.113:9876;192.168.8.114:9876"

sh bin/mqadmin updateSubGroup -c DefaultCluster -g producerGroup -n "192.168.8.113:9876;192.168.8.114:9876"

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.2</version>
        </dependency>

生产者:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("192.168.8.113:9876;192.168.8.114:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("myTopic" ,"TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
 

消费者:


import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class SyncConsumer {

public static void main(String[] args) throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("customerGroup");
consumer.setNamesrvAddr("192.168.8.113:9876;192.168.8.114:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("myTopic", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt ext : msgs) {
String body = new String(ext.getBody());
System.out.println(body);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
 

10、RocketMQ--控制台

docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=192.168.204.62:9876;192.168.204.63:9876 -Drocketmq.config.isVIPChannel=false" -p 8294:8080 -t styletang/rocketmq-console-ng



 

原文地址:https://www.cnblogs.com/360minitao/p/14842297.html