RocketMq总结(五) -- 消息队列负载均衡和再分配

  RocketMQ 消息队列重新分布 由RebalanceService 来实现的 。一个 MQClientInstance持有一个RebalanceService现实,并随 MQClientlnstance 的启动而启动
public class MQClientInstance {
    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final InternalLogger log = ClientLogger.getLog();
    private final ClientConfig clientConfig;
    private final int instanceIndex;
    private final String clientId;
    private final long bootTimestamp = System.currentTimeMillis();
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
    private final NettyClientConfig nettyClientConfig;
    private final MQClientAPIImpl mQClientAPIImpl;
    private final MQAdminImpl mQAdminImpl;
    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    private final Lock lockNamesrv = new ReentrantLock();
    private final Lock lockHeartbeat = new ReentrantLock();
    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();
    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
        new ConcurrentHashMap<String, HashMap<String, Integer>>();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });
    private final ClientRemotingProcessor clientRemotingProcessor;
    private final PullMessageService pullMessageService;
    private final RebalanceService rebalanceService;
    private final DefaultMQProducer defaultMQProducer;
    private final ConsumerStatsManager consumerStatsManager;
  RebalanceService # run
@Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

  RebalanceService 线程默 20s 执行 mqClientFactory.doRebalance()方法,精髓都在20s

public void doRebalance() {
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    impl.doRebalance();
                } catch (Throwable e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }

  最终调用的是 Rebalancelmpl#rebalanceByTopic

case CLUSTERING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
Step1 :从主题订阅信息缓存表中获取主题的队列信息; 发送请求从 Broker中 该消费组内 当前所有 的消费者客户端 ID,主题 topic 的队列可能分布在多个 Broker上,那 请求发往哪个 Broker呢? RocketMQ 从主题的路由信息表中随机选择一个 Broker。 Broker什么 存在消费组内所有消费者的信息呢?我们不妨回忆一下消费者在启动的时候会向MQClientlnstance中注册消费者,然后 MQClientlnstance 会向所有的 Broker 送心跳包,心跳包中包含 MQClientlnstance 消费者信息。如果 mqSet, cidAll 任意一个为空则忽略本次消息队列负载。
Step2 :首先对 cidAll,mqAll 排序,这个很重要,同 个消费组内看到的视图保持一致,
确保同 个消费队列不会被多个消费者分配 RocketMQ 消息队列分配算法接口
 
Rocketmq默认的分配算法是 AllocateMessageQueueAveragely
 

Step3:根据分配到的MessageQueue更新本消费者客户端的消息

  Rebalancelmpl#updateProcessQueueTableInRebalance
  
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        boolean changed = false;

        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue mq = next.getKey();
            ProcessQueue pq = next.getValue();

            if (mq.getTopic().equals(topic)) {
                if (!mqSet.contains(mq)) {
                    pq.setDropped(true);
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                    }
                }
Step3 : ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable ,当前消费者负载的消息队列缓存表,如果缓存表中的 MessageQueue 不包含在 mqSet 中,说明经过本次消息队列负载后,该 mq 被分配给其他消费者,故需要暂停该消息队列消息的消费,方法是将 ProccessQueue 的状态设置为 dropped=true,该 ProcessQueue 中的消息将不会再被消费,调用 removeUnnecessaryMessageQueue 方法判断是否将 MessageQueue,ProccessQueue 缓存
表中移除。removeUnnecessaryMessageQueue 在Rebalancelmple 定义为抽象方法。 removeUnnecessaryMessageQueue 方法主要持久化待移除 MessageQueu 消息消费进度。 

  

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }

                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
Step4 :遍历本次负载分配到的队列集合,如果 processQueueTable 中没有包含该消息
队列,表明这是本次新增加的消息队列, 首先从内存中移除该消息队列的消费进度,然后
从磁盘中读取该消息队列的消费进度,创建 PullRequest 对象。 这里有一个关键,如果读取到的消费进度小于0,则需要校对消费进度。 RocketMQ 提供 CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM TIMESTAMP 方式,
在创建消费者时可以通过调用 DefaultMQPushConsumer#setConsumeFromWhere 方法设置
PullRequest的 nextOffset 计算逻辑位于 RebalancePushlmpl#computePullFromWhere

  用两个问题作为总结

  问题1 : PullRequest 对象在什么时候创建并加入到 pullRequestQueue 中以便唤PullMessageService 线程。

  RebalanceService 线程每隔 20s 对消费者订阅的主题进行一次队列重新分配, 每一次分配都会获取主题的所有队列、从 Broke 服务器实时查询当前该主题该消费组内消费者列表,对新分配的消息队列会创建对应的 PullRequest 对象。 在一个 JVM进 程中,同一个消费组同 一个队列只会存在一个 PullRequest对象。

  问题2:集群内多个消费者是如何负载主题下的多个消费队列 ,并且如果有新的消费者加入时,消息队列又会如何重新分布。
  由于每次进行队列重新负载时会从 Broker 实时查询出当前消费组内所有消费者,并且
对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消
费队列从而消费消息。
原文地址:https://www.cnblogs.com/juniorMa/p/15127356.html