rocketmq consumer 负载均衡

  DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个

1、DefaultMQPushConsumer启动后,会马上触发一个deRebalance动作;

      1.1、DefaultMQPushConsumerImpl.start()

 1     public synchronized void start() throws MQClientException {
 2         switch (this.serviceState) {
 3             case CREATE_JUST:
 4                 log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
 5                     this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
 6                 this.serviceState = ServiceState.START_FAILED;
 7 
 8                 this.checkConfig();
 9 
10                 this.copySubscription();
11 
12                 if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
13                     this.defaultMQPushConsumer.changeInstanceNameToPID();
14                 }
15 
16                 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
17 
18                 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
19                 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
20                 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
21                 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
22 
23                 this.pullAPIWrapper = new PullAPIWrapper(
24                     mQClientFactory,
25                     this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
26                 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
27 
28                 if (this.defaultMQPushConsumer.getOffsetStore() != null) {
29                     this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
30                 } else {
31                     switch (this.defaultMQPushConsumer.getMessageModel()) {
32                         case BROADCASTING:
33                             this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
34                             break;
35                         case CLUSTERING:
36                             this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
37                             break;
38                         default:
39                             break;
40                     }
41                     this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
42                 }
43                 this.offsetStore.load();
44 
45                 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
46                     this.consumeOrderly = true;
47                     this.consumeMessageService =
48                         new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
49                 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
50                     this.consumeOrderly = false;
51                     this.consumeMessageService =
52                         new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
53                 }
54 
55                 this.consumeMessageService.start();
56 
57                 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
58                 if (!registerOK) {
59                     this.serviceState = ServiceState.CREATE_JUST;
60                     this.consumeMessageService.shutdown();
61                     throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
62                         + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
63                         null);
64                 }
65 
66                 mQClientFactory.start();
67                 log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
68                 this.serviceState = ServiceState.RUNNING;
69                 break;
70             case RUNNING:
71             case START_FAILED:
72             case SHUTDOWN_ALREADY:
73                 throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
74                     + this.serviceState
75                     + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
76                     null);
77             default:
78                 break;
79         }

      1.2、MQClientInstance.start()

 1     public void start() throws MQClientException {
 2 
 3         synchronized (this) {
 4             switch (this.serviceState) {
 5                 case CREATE_JUST:
 6                     this.serviceState = ServiceState.START_FAILED;
 7                     // If not specified,looking address from name server
 8                     if (null == this.clientConfig.getNamesrvAddr()) {
 9                         this.mQClientAPIImpl.fetchNameServerAddr();
10                     }
11                     // Start request-response channel
12                     this.mQClientAPIImpl.start();
13                     // Start various schedule tasks
14                     this.startScheduledTask();
15                     // Start pull service
16                     this.pullMessageService.start();
17                     // Start rebalance service
18                     this.rebalanceService.start();
19                     // Start push service
20                     this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
21                     log.info("the client factory [{}] start OK", this.clientId);
22                     this.serviceState = ServiceState.RUNNING;
23                     break;
24                 case RUNNING:
25                     break;
26                 case SHUTDOWN_ALREADY:
27                     break;
28                 case START_FAILED:
29                     throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
30                 default:
31                     break;
32             }
33         }
34     }

      1.3、org.apache.rocketmq.common.ServiceThread.start()

              RebalanceService.run()

 1     @Override
 2     public void run() {
 3         log.info(this.getServiceName() + " service started");
 4 
 5         while (!this.isStopped()) {
 6             this.waitForRunning(waitInterval);
 7             this.mqClientFactory.doRebalance();
 8         }
 9 
10         log.info(this.getServiceName() + " service end");
11     }

  1.4、MQClientInstance.doRebalance()‘

 1     public void doRebalance() {
 2         for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
 3             MQConsumerInner impl = entry.getValue();
 4             if (impl != null) {
 5                 try {
 6                     impl.doRebalance();
 7                 } catch (Throwable e) {
 8                     log.error("doRebalance exception", e);
 9                 }
10             }
11         }
12     }

2、而且在同一个ConsumerGroup里加入新的DefaultMQPushConsumer时,

各个Consumer都会被触发doRebalance动作

ClientRemotingProcessor.processRequest(ChannelHandlerContext, RemotingCommand)

 1     @Override
 2     public RemotingCommand processRequest(ChannelHandlerContext ctx,
 3         RemotingCommand request) throws RemotingCommandException {
 4         switch (request.getCode()) {
 5             case RequestCode.CHECK_TRANSACTION_STATE:
 6                 return this.checkTransactionState(ctx, request);
 7             case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
 8                 return this.notifyConsumerIdsChanged(ctx, request);
 9             case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
10                 return this.resetOffset(ctx, request);
11             case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
12                 return this.getConsumeStatus(ctx, request);
13 
14             case RequestCode.GET_CONSUMER_RUNNING_INFO:
15                 return this.getConsumerRunningInfo(ctx, request);
16 
17             case RequestCode.CONSUME_MESSAGE_DIRECTLY:
18                 return this.consumeMessageDirectly(ctx, request);
19             default:
20                 break;
21         }
22         return null;
23     }

 consumer负载均衡策略接口AllocateMessageQueueStrategy

 1 /**
 2  * Strategy Algorithm for message allocating between consumers
 3  */
 4 public interface AllocateMessageQueueStrategy {
 5 
 6     /**
 7      * Allocating by consumer id
 8      *
 9      * @param consumerGroup current consumer group
10      * @param currentCID current consumer id
11      * @param mqAll message queue set in current topic
12      * @param cidAll consumer set in current consumer group
13      * @return The allocate result of given strategy
14      */
15     List<MessageQueue> allocate(
16         final String consumerGroup,
17         final String currentCID,
18         final List<MessageQueue> mqAll,
19         final List<String> cidAll
20     );
21 
22     /**
23      * Algorithm name
24      *
25      * @return The strategy name
26      */
27     String getName();
28 }
View Code

具体的负载均衡有六种 ,

默认使用AllocateMessageQueueAveragely,负载均衡的结果与Topic的Message Queue数量,以及

ConsumerGroup里的Consumer的数量有关。负载均衡的分配粒度只到Message Queue,把Topic下的所有

Message Queue分配到不同Consumer中,所以Message Queue和Consumer的数量关系,或者整除关系

影响负载均衡结果

3、以下以AllocateMessageQueueAveragely为例讲解

 3.1 RebalanceImpl.doRebalance(boolean)

 1     public void doRebalance(final boolean isOrder) {
 2         Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
 3         if (subTable != null) {
 4             for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
 5                 final String topic = entry.getKey();
 6                 try {
 7                     this.rebalanceByTopic(topic, isOrder);
 8                 } catch (Throwable e) {
 9                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
10                         log.warn("rebalanceByTopic Exception", e);
11                     }
12                 }
13             }
14         }
15 
16         this.truncateMessageQueueNotMyTopic();
17     }

  3.2、RebalanceImpl.rebalanceByTopic(String, boolean)

 1     private void rebalanceByTopic(final String topic, final boolean isOrder) {
 2         switch (messageModel) {
 3             case BROADCASTING: {
 4                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
 5                 if (mqSet != null) {
 6                     boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
 7                     if (changed) {
 8                         this.messageQueueChanged(topic, mqSet, mqSet);
 9                         log.info("messageQueueChanged {} {} {} {}",
10                             consumerGroup,
11                             topic,
12                             mqSet,
13                             mqSet);
14                     }
15                 } else {
16                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
17                 }
18                 break;
19             }
20             case CLUSTERING: {
            //获取该Topic下所有的MessageQueue,包括不同broker下的
21 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
           //查询该consumerGroup,topic下consumerIdList
22 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 23 if (null == mqSet) { 24 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 25 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 26 } 27 } 28 29 if (null == cidAll) { 30 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); 31 } 32 33 if (mqSet != null && cidAll != null) { 34 List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); 35 mqAll.addAll(mqSet); 36 37 Collections.sort(mqAll); 38 Collections.sort(cidAll); 39 40 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 41 42 List<MessageQueue> allocateResult = null; 43 try { 44 allocateResult = strategy.allocate( 45 this.consumerGroup, 46 this.mQClientFactory.getClientId(), 47 mqAll, 48 cidAll); 49 } catch (Throwable e) { 50 log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), 51 e); 52 return; 53 } 54 55 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); 56 if (allocateResult != null) { 57 allocateResultSet.addAll(allocateResult); 58 } 59 60 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 61 if (changed) { 62 log.info( 63 "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", 64 strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), 65 allocateResultSet.size(), allocateResultSet); 66 this.messageQueueChanged(topic, mqSet, allocateResultSet); 67 } 68 } 69 break; 70 } 71 default: 72 break; 73 } 74 }

  3.3、AllocateMessageQueueStrategy.allocate(String, String, List<MessageQueue>, List<String>)

 1 /**
 2  * Average Hashing queue algorithm
 3  */
 4 public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
 5     private final InternalLogger log = ClientLogger.getLog();
 6 
 7     @Override
 8     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
 9         List<String> cidAll) {
10         if (currentCID == null || currentCID.length() < 1) {
11             throw new IllegalArgumentException("currentCID is empty");
12         }
13         if (mqAll == null || mqAll.isEmpty()) {
14             throw new IllegalArgumentException("mqAll is null or mqAll empty");
15         }
16         if (cidAll == null || cidAll.isEmpty()) {
17             throw new IllegalArgumentException("cidAll is null or cidAll empty");
18         }
19 
20         List<MessageQueue> result = new ArrayList<MessageQueue>();
21         if (!cidAll.contains(currentCID)) {
22             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
23                 consumerGroup,
24                 currentCID,
25                 cidAll);
26             return result;
27         }
28 
29         int index = cidAll.indexOf(currentCID);
30         int mod = mqAll.size() % cidAll.size();
31         int averageSize =
32             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
33                 + 1 : mqAll.size() / cidAll.size());
34         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
35         int range = Math.min(averageSize, mqAll.size() - startIndex);
36         for (int i = 0; i < range; i++) {
37             result.add(mqAll.get((startIndex + i) % mqAll.size()));
38         }
39         return result;
40     }
41 
42     @Override
43     public String getName() {
44         return "AVG";
45     }
46 }

分配算法

平均分配策略(默认)(AllocateMessageQueueAveragely)
环形分配策略(AllocateMessageQueueAveragelyByCircle)
手动配置分配策略(AllocateMessageQueueByConfig)
机房分配策略(AllocateMessageQueueByMachineRoom)
一致性哈希分配策略(AllocateMessageQueueConsistentHash)
靠近机房策略(AllocateMachineRoomNearby)

平均分配、环形分配如下

普通消费方式

Message Queue

ConsumerId
消息队列[0] Consumer[0]
消息队列[1] Consumer[0]
消息队列[2] Consumer[0]
消息队列[3] Consumer[1]
消息队列[4] Consumer[1]
消息队列[5] Consumer[1]
消息队列[6] Consumer[2]
消息队列[7] Consumer[2]


- 环形消费方式

Message Queue  ConsumerId
消息队列[0]  Consumer[0]
消息队列[1]  Consumer[1]
消息队列[2]  Consumer[2]
消息队列[3]  Consumer[0]
消息队列[4]  Consumer[1]
消息队列[5]  Consumer[2]
消息队列[6]  Consumer[0]
消息队列[7]  Consumer[1]

机房分配策略

 1     @Override
 2     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
 3         List<String> cidAll) {
 4         List<MessageQueue> result = new ArrayList<MessageQueue>();
 5         int currentIndex = cidAll.indexOf(currentCID);
 6         if (currentIndex < 0) {
 7             return result;
 8         }
 9         List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
10         for (MessageQueue mq : mqAll) {
11             String[] temp = mq.getBrokerName().split("@");
12             if (temp.length == 2 && consumeridcs.contains(temp[0])) {
13                 premqAll.add(mq);
14             }
15         }
16 
17         int mod = premqAll.size() / cidAll.size();
18         int rem = premqAll.size() % cidAll.size();
19         int startIndex = mod * currentIndex;
20         int endIndex = startIndex + mod;
21         for (int i = startIndex; i < endIndex; i++) {
22             result.add(mqAll.get(i));
23         }
24         if (rem > currentIndex) {
25             result.add(premqAll.get(currentIndex + mod * cidAll.size()));
26         }
27         return result;
28     }

第4-7行, 计算当前消费者在消费者集合中的下标(index), 如果下标 < 0 , 则直接返回

第8-14行, 根据brokerName解析出所有有效机房信息(其实是有效mq), 结果存储在premqAll中

第17行, 计算消息整除的平均结果mod

第18行, 计算消息是否能够被平均消费rem,(即消息平均消费后还剩多少消息队列(remaing))

第19行, 计算当前消费者开始消费的下标(startIndex)

第20行, 计算当前消费者结束消费的下标(endIndex)

第21-26行, 将消息的消费分为两部分, 第一部分 – (cidAllSize * mod) , 第二部分 – (premqAll - cidAllSize * mod) ;

从第一部分中查询startIndex ~ endIndex之间所有的消息, 从第二部分中查询 currentIndex + mod * cidAll.size() , 最后返回查询的结果result

可以通过下面的例子进一步了解,假设有三个消费者, 八个消息队列

Message Queue Consumer

Consumer
消息队列[0] Consumer[0]
消息队列[1] Consumer[0]
消息队列[2] Consumer[1]
消息队列[3] Consumer[1]
消息队列[4] Consumer[2]
消息队列[5] Consumer[2]
消息队列[6] Consumer[0]
消息队列[7] Consumer[1]

靠近机房算法

 1 

/**
* An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
* specified.
*
* If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
* should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
* no alive consumer to monopolize them.
*/

    @Override
 2     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
 3         List<String> cidAll) {
 4         if (currentCID == null || currentCID.length() < 1) {
 5             throw new IllegalArgumentException("currentCID is empty");
 6         }
 7         if (mqAll == null || mqAll.isEmpty()) {
 8             throw new IllegalArgumentException("mqAll is null or mqAll empty");
 9         }
10         if (cidAll == null || cidAll.isEmpty()) {
11             throw new IllegalArgumentException("cidAll is null or cidAll empty");
12         }
13 
14         List<MessageQueue> result = new ArrayList<MessageQueue>();
15         if (!cidAll.contains(currentCID)) {
16             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
17                 consumerGroup,
18                 currentCID,
19                 cidAll);
20             return result;
21         }
22 
23         //group mq by machine room
24         Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
25         for (MessageQueue mq : mqAll) {
26             String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
27             if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
28                 if (mr2Mq.get(brokerMachineRoom) == null) {
29                     mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
30                 }
31                 mr2Mq.get(brokerMachineRoom).add(mq);
32             } else {
33                 throw new IllegalArgumentException("Machine room is null for mq " + mq);
34             }
35         }
36 
37         //group consumer by machine room
38         Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
39         for (String cid : cidAll) {
40             String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
41             if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
42                 if (mr2c.get(consumerMachineRoom) == null) {
43                     mr2c.put(consumerMachineRoom, new ArrayList<String>());
44                 }
45                 mr2c.get(consumerMachineRoom).add(cid);
46             } else {
47                 throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
48             }
49         }
50 
51         List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
52 
53         //1.allocate the mq that deploy in the same machine room with the current consumer
54         String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
55         List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
56         List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
57         if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
58             allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
59         }
60 
61         //2.allocate the rest mq to each machine room if there are no consumer alive in that machine room
62         for (String machineRoom : mr2Mq.keySet()) {
63             if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
64                 allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
65             }
66         }
67 
68         return allocateResults;
69     }
原文地址:https://www.cnblogs.com/toUpdating/p/9989477.html