RocketMq分析

RocketMq分析

1、几个消费队列的关键问题

(1)如何保证消息队列的高可用

(2)如何保证消息传输的高可靠性

(3)如何保证消息消费的幂等性

(4)如何保证消息消费的顺序性

(5)有几百万消息持续积压几小时怎么解决?

2、RocketMq如何解决上面几个问题

(1)如何保证消息队列的高可用

首先要知道rocketmq的主要结构

四个模块

producer:消息生产者

broker:接收消息持久化,维持消息队列

nameServer:维护topic tag ->> broker的路径

consumer:从broker中拿消费的消息

所有这些都是集群模式,分布式,可以保证服务的高可用。

nameServer的服务发现和移除

但是这个broker在nameServer存储的路由信息有一定时间的不可用的可能性

使用消息发送的高可用来保证该问题的解决

(1)重试机制

(2)不可用broker一段时间规避

NameServer 

nameServer主要用来保存从topic到broker路由信息

主要数据结构

    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

  主要包括以上五种信息集合

(1)topic  --->   broker的信息(brokerName、其他信息)

(2)broker  ---->  address信息(broker对应的地址:具体地址)

(3)brokerLive信息(心跳检测会更新broker对应的last更新时间)

两个关键功能:路由注册、路由摘除

 (1)路由注册

broker启动之后会开始心跳,会把自己注册到nameServer

获取所有broker列表

遍历注册自己

然后把上面四个信息填充完成

(2)路由删除

有两个出发点

  (1)NameServer定时扫描心跳信息,如果上一次心跳和当前时间差超过120s,移除该broker

  (2)Broker在关闭的时候

公共方法

 public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }

移除四个信息中这个broker的元素

 消息的发送

1、消息体的主要结构

关键属性 

    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;

  

 

原文地址:https://www.cnblogs.com/zhangchiblog/p/14192157.html