RocketMQ

一、安装准备

1)修改配置

修改runserver文件:

vi runserver.sh

1597716798394

修改为如下所示:

1597716847927

修改runbroker文件:

vi runbroker.sh

1597716879152

修改为:

1597716909819

2)启动rocketmq

启动nameserver

# 前台启动
./mqnamesrv
# 后台启动
nohup ./mqnamesrv > /dev/null 2>&1 &

启动broker

方式一:通过“-n”指定nameserver

# 前台启动
./mqbroker -n 192.168.0.112:9876 autoCreateTopicEnable=true
# 后台启动:端口必须指定9876
nohup ./mqbroker -n 192.168.0.112:9876 autoCreateTopicEnable=true > /dev/null 2>&1 & 

方式二:通过“-c‘配置文件

# vi /opt/apps/rocketmq-4.7.1/conf/broker.conf
brockerIP1=192.168.0.112
namesrvAddr=192.168.0.112:9876
brockerName=zomicc
# 前台启动
./mqbroker -c  /opt/apps/rocketmq-4.7.1/conf/broker.conf
# 后台启动
nohup ./mqbroker -c  /opt/apps/rocketmq-4.7.1/conf/broker.conf > /dev/null 2>&1 & 

命令行测试

# 编辑profile文件
vi /etc/profile
# 设置nameserver服务器
export NAMESRV_ADDR=172.17.0.2:9876
# 刷新
source /etc/profile
# 测试消息发送命令
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
# 测试消息接收命令
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

3)安装管理工具

A:docker安装

# 拉取镜像 
docker pull styletang/rocketmq-console-ng:1.0.0 
# 创建并启动容器 (后台启动可加上-di参数)
单机或者集群都只需用指定一个地址:
docker run --name rmq-console -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.0.112:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng:1.0.0
指定多个地址也不会报错:
docker run --name rmq-console -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.0.112:9876;192.168.0.112:9877 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng:1.0.0

B:非docker安装

从https://github.com/apache/rocketmq-externals下载rocketmq-externals,进入rocketmq-console。

修改rocketmq-console/src/main/resources/application.properties中的rocketmq.config.namesrvAddr。

1597834305301

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-2.0.0.jar

浏览器访问http://192.168.0.112:8082/,看到如下图证明管理工具设置成功。

1597834730501

4)集群搭建2m2s

基于方便起见以及对docker的练习,直接采用docker安装。

注意:配置文件在容器中的/etc/rocketmq/brocker.conf中。

两个可使用便捷命令

docker cp ./broker.conf rmqbroker01:/etc/rocketmq/broker.conf
docker cp rmqbroker01:/etc/rocketmq/broker.conf  ./

创建两个nameserver

# nameserver1 
docker create -p 9876:9876 --name rmqserver01  
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m"  
-e "JAVA_OPTS=-Duser.home=/opt"  
foxiswho/rocketmq:server-4.5.1
====================================================
# nameserver2 
docker create -p 9877:9876 --name rmqserver02  
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m"  
-e "JAVA_OPTS=-Duser.home=/opt"  
foxiswho/rocketmq:server-4.5.1 

创建两个master brocker

# master broker01 
docker create --net host --name rmqbroker01  
-e "JAVA_OPTS=-Duser.home=/opt"  
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m"  
foxiswho/rocketmq:broker-4.5.1 
# 配置 
namesrvAddr=192.168.0.112:9876;192.168.0.112:9877
brokerClusterName=testCluster 
brokerName=broker01
brokerId=0 #0是master非0是slave
deleteWhen=04 #删除文件时间点,默认凌晨 4点
fileReservedTime=48 #文件保留时间,默认48小时
brokerRole=SYNC_MASTER #Broker 的角色 ASYNC_MASTER 异步复制Master SYNC_MASTER 同步双写Master 							从机SLAVE
flushDiskType=ASYNC_FLUSH #刷盘模式 ASYNC_FLUSH异步,SYNC_FLUSH同步
brokerIP1=192.168.0.112
brokerIp2=192.168.0.112
listenPort=10911
================================
# master broker02 
docker create --net host --name rmqbroker02  
-e "JAVA_OPTS=-Duser.home=/opt"  
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m"  
foxiswho/rocketmq:broker-4.5.1 
# 配置
namesrvAddr=192.168.0.112:9876;192.168.0.112:9877
brokerClusterName=testCluster 
brokerName=broker02
brokerId=0
deleteWhen=04
fileReservedTime=48 
brokerRole=SYNC_MASTER 
flushDiskType=ASYNC_FLUSH 
brokerIP1=192.168.0.112 
brokerIp2=192.168.0.112 
listenPort=10811 

创建两个slave brocker

# slave broker03 
docker create --net host --name rmqbroker03  
-e "JAVA_OPTS=-Duser.home=/opt"  
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m"  
foxiswho/rocketmq:broker-4.5.1 
# 配置
namesrvAddr=192.168.0.112:9876;192.168.0.112:9877
brokerClusterName=testCluster
brokerName=broker01 
brokerId=1 
deleteWhen=04 
fileReservedTime=48 
brokerRole=SLAVE 
flushDiskType=ASYNC_FLUSH 
brokerIP1=192.168.0.112 
brokerIp2=192.168.0.112 
listenPort=10711
=========================
# slave broker04
docker create --net host --name rmqbroker04  
-e "JAVA_OPTS=-Duser.home=/opt"  
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m"  
foxiswho/rocketmq:broker-4.5.1 
# 配置 
namesrvAddr=192.168.0.112:9876;192.168.0.112:9877
brokerClusterName=testCluster 
brokerName=broker02 
brokerId=1 
deleteWhen=04 
fileReservedTime=48 
brokerRole=SLAVE 
flushDiskType=ASYNC_FLUSH 
brokerIP1=192.168.0.112 
brokerIp2=192.168.0.112 
listenPort=10611 

启动容器

#启动容器 
docker start rmqserver01 rmqserver02 
docker start rmqbroker01 rmqbroker02 rmqbroker03 rmqbroker04

可能出现的错

java.net.BindException: Address already in use
可能原因1:端口相同了。
可能原因2:4个broker在同一台机器上,listenPort间隔太近了。可适当调大一点。
The Name Server Address[192.168.0.112:9876;192.168.0.112:9877 ] illegal, please set it as follows, "127.0.0.1:9876;192.168.0.1:9876"
是因为多了一个非法的空格。注意删除一切无用的尾部空格。

浏览器访问http://192.168.0.112:8082/,看到如下图才证明集群搭建成功。否则检查相关配置(尤其注意一下多余空格。比如一个集群名为“testCluster”,一个为“testCluster ”,这样也是有问题的)

1597834543640

systemctl firewalld.service stop

二、集群架构

网络架构图总览

https://github.com/apache/rocketmq/tree/master/docs/cn

1598087523796

消息存储

ConsumeQueue(offset)+CommitLog

负载均衡

从队列角度看:Topic 和 Queue 是 1 对多的关系,一个 Topic 下可以包含多个 Queue,主要用于负
载均衡。发送消息时,用户只指定 Topic,Producer 会根据 Topic 的路由信息选择具体发到
哪个 Queue 上。Consumer 订阅消息时,会根据负载均衡策略决定订阅哪些 Queue 的消息。

从集群角度说:一个队列下的Queue会分片到不同的broker上,可以降低单Master的压力。

从主从配置说:从机也支持消息消费,可以降低主机的压力。

负载均衡策略都有哪些?

consumer.setAllocateMessageQueueStrategy(实现AllocateMessageQueueStrategy接口的策略);

(1)平均分配算法

假如有10个队列,4个消费者,则分配规则是:10除4等于2余2。则每个消费者先分配2个Queue,余数的2个依次分给Consumer1和Consumer2。

1597843353089

(2)环形分配算法

1597843063127

(3)就近机房算法

1597843124743

刷盘策略

(1)同步刷盘 SYNC_FLUSH

返回成功状态时,消息已经被写入磁盘。

消息写入内存 pagecache 后,立即通知刷盘线程,刷盘完成后,返回消息写成功的状态。

(2)异步刷盘 ASYNC_FLUSH

返回成功状态时,消息只是被写入内存 pagecache,写操作返回快,吞吐量达大,当内存里的消息积累到一定程度时,统一出发写磁盘动作,快速写入。

复制策略

(1)同步复制 SYNC_MASTER

master 和 slave 都写成功后返回成功状态。好处是如果master出故障,slave上有全部备份,容易恢复。缺点是增大延迟,降低吞吐量。

(2)异步复制 ASYNC_MASTER

只要 master 写成功就返回成功状态。好处是低延迟、高吞吐,缺点是如果 master 出故障,数据没有写入 slave,就会有丢失。

推荐 ASYNC_FLUSH + SYNC_MASTER。在消息响应和消息可靠性之间做一个平衡。

三、基础用法

请参考:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

消息发送

同步发送:应用于可靠性要求比较高的场景。比如:重要的消息通知,短信通知等。

1598068347093

异步发送:通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

1598068417386

单向发送:主要用在不特别关心发送结果的场景,例如日志发送。

1598068435338

消息接收

注册监听器接收消息

1598068489299

广播模式、集群模式

同一个消费组内:

集群模式下的同一个Message只会被一个消费者消费;

广播模式下的同一个Message会被所有消费者消费。

// 集群模式 (默认)
consumer.setMessageModel(MessageModel.CLUSTERING); 
// 广播模式 
consumer.setMessageModel(MessageModel.BROADCASTING);

消息标签

可以为消息设置标签,方便灵活分类、消费过滤等。(Topic可以看做是一级分类,Tag可以看做是二级分类)

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

四、常见问题分析

顺序消息

场景举例:订单确认->付款成功->通知发货->签收成功

单个Queue中,消息的存储是有序的。要想保证消息的顺序性,需要保证消息发送到broker的有序性和消息接收处理完成的有序性。如何实现:

①生产者采用同步单线程发送消息至同一个Queue

public class OrderlyProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("p-GroupOrder01");
        producer.setNamesrvAddr("192.168.0.112:9876");
        producer.start();

        List<String> order1 = new ArrayList<>();
        order1.add("订单1---创建");
        order1.add("订单1---付款");
        order1.add("订单1---发货");
        order1.add("订单1---签收");
        order1.add("订单1---完成");
        List<String> order2 = new ArrayList<>();
        order2.add("订单2---创建");
        order2.add("订单2---付款");
        order2.add("订单2---发货");
        order2.add("订单2---签收");
        order2.add("订单2---完成");
        resolveOrder(producer, order1, 1L);
        resolveOrder(producer, order2, 2L);

        producer.shutdown();
    }

    public static void resolveOrder(DefaultMQProducer producer, List<String> orderDetail,
                                    Long orderID) throws Exception{
        int size = orderDetail.size();
        for (int i = 0; i < size; i++) {
            Message msg = new Message("TopicTest", orderDetail.get(i).getBytes("UTF-8"));
            SendResult send = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;
                    long index = id % mqs.size();//根据订单id选择发送queue
                    return mqs.get((int) index);
                }
            }, orderID);
        }
    }
}

②消费者采用MessageListenerOrderly监听器

MessageListenerOrderly的作用是通过锁的方式,保证同一时刻只有一个线程能从同一个Queue中拉取消息。

而不是一个Queue只有一个处理线程。

public class OrderConsumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("c-GroupOrder01");
        consumer.setNamesrvAddr("192.168.0.112:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    // 可以看到订单对每个queue(分区)有序
                    System.out.println("consumeThread=" + Thread.currentThread().getName()
                            + " queueId=" + msg.getQueueId() + ", content:"
                            + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

延迟消息

场景举例:唯品会购物车倒计时取消订单。30min后去检查订单的状态,如果还是未付款就取消订单释放库存。

设置消息延迟级别:

// String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
 message.setDelayTimeLevel(3);

事务消息

场景举例:Bob向Smith转账

1598091392963

A、B都会有问题。主要原因在于发消息与扣款非原子性。通过事务消息可以解决。

1598091479276

1598093771820

代码示例:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException {
        TransactionMQProducer producer = new TransactionMQProducer("p-TranGroup");
        producer.setNamesrvAddr("192.168.0.112:9876");
        producer.setTransactionListener(new TransactionListenerImpl());// 设置事务监听
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TranTopic", ("hello transaction " + i).getBytes());
            TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
            System.out.println(result);
        }
        producer.shutdown();
    }
}

class TransactionListenerImpl implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 这里调用本地事务的执行
        try {
            System.out.println("start local transaction....");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 这里进行本地事务状态的查询
        int status = new Random().nextInt(3);
        switch (status) {
            case 0:
                return LocalTransactionState.UNKNOW;
            case 1:
                return LocalTransactionState.COMMIT_MESSAGE;
            default:
                return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

消息去重

正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题(提示:MessageID),这也是RocketMQ不解决消息重复的问题的原因。

高可用、高可靠

①多master部署,防止单节点故障,以此保障rocketmq的可用性。

②Producer的send方法本身支持内部重试,重试逻辑如下:1)至多重试3次;2)如果发送失败,则轮转到下一个Broker;3)这个方法的总耗时不超过producer.setSendMsgTimeout()设置的最大值,默认10s。这仍然无法保证消息100%成功,为保证消息一定成功,可以再send同步发送失败时,将消息存储到db,有后台任务去处理。以此保证rocketmq的可靠性。

"我们所要追求的,永远不是绝对的正确,而是比过去的自己更好"
原文地址:https://www.cnblogs.com/zomicc/p/13531183.html