RocketMq分析
1、几个消费队列的关键问题
(1)如何保证消息队列的高可用
(2)如何保证消息传输的高可靠性
(3)如何保证消息消费的幂等性
(4)如何保证消息消费的顺序性
(5)有几百万消息持续积压几小时怎么解决?
2、RocketMq如何解决上面几个问题
(1)如何保证消息队列的高可用
首先要知道rocketmq的主要结构
四个模块
producer:消息生产者
broker:接收消息持久化,维持消息队列
nameServer:维护topic tag ->> broker的路径
consumer:从broker中拿消费的消息
所有这些都是集群模式,分布式,可以保证服务的高可用。
nameServer的服务发现和移除
但是这个broker在nameServer存储的路由信息有一定时间的不可用的可能性
使用消息发送的高可用来保证该问题的解决
(1)重试机制
(2)不可用broker一段时间规避
NameServer
nameServer主要用来保存从topic到broker路由信息
主要数据结构
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
主要包括以上五种信息集合
(1)topic ---> broker的信息(brokerName、其他信息)
(2)broker ----> address信息(broker对应的地址:具体地址)
(3)brokerLive信息(心跳检测会更新broker对应的last更新时间)
两个关键功能:路由注册、路由摘除
(1)路由注册
broker启动之后会开始心跳,会把自己注册到nameServer
获取所有broker列表
遍历注册自己
然后把上面四个信息填充完成
(2)路由删除
有两个出发点
(1)NameServer定时扫描心跳信息,如果上一次心跳和当前时间差超过120s,移除该broker
(2)Broker在关闭的时候
公共方法
public void scanNotActiveBroker() { Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
移除四个信息中这个broker的元素
消息的发送
1、消息体的主要结构
关键属性
private String topic; private int flag; private Map<String, String> properties; private byte[] body;