RocketMQ:(4) Consumer

一、 定时消息机制

  定时消息是指消息发送到Broker后,并不立即被消费者消费而是要等到特定的时间后才能被消费,RocketMQ并不支持任意的时间精度,如果要支持任意时间精度定时调度,不可避免地需要在Broker层做消息排序,再加上持久化方面的考量,将不可避免地带来巨大的性能消耗,所以RocketMQ只支持特定级别的延迟消息。消息延迟级别在Broker端通过messageDelayLevel配置,默认为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,delayLevel=1表示延迟消息1s,delayLevel=2表示延迟5s,依次类推。

  消息重试正是借助定时任务实现的,在将消息存入commitlog文件之前需要判断消息的重试次数,如果大于0,则会将消息的主题设置为SCHEDULE_TOPIC_XXXX。

ScheduleMessageService

  RocketMQ定时消息实现类为ScheduleMessageService,调用顺序为:构造方法 -> load() -> start()。

load方法

  该方法主要完成延迟消息消费队列消息进度的加载与delayLevelTable数据的构造,延迟队列消息消费进度默认存储路径为${ROCKET_HOME}/store/config/delayOffset.json,同时解析messageDelayLevel定义的延迟级别转换为Map,延迟级别1,2,3等对应的延迟时间。

start方法

  根据延迟级别创建对应的延时任务,启动定时任务持久化延迟消息队列进度存储。
  Step1:根据延迟队列创建定时任务,遍历延迟级别,根据延迟级别 level 从 offsetTable 中获取消费队列的消费进度,如果不存在,则使用0。也就是说每一个延迟级别对应一个消息消费队列。然后创建定时任务,每一个定时任务第一次启动时默认延迟1s先执行一次定时任务,第二次调度开始才使用相应的延迟时间。延迟级别与消息消费队列的映射关系为:消息队列ID = 延迟级别 - 1。
  Step2:创建定时任务,每隔10s持久化一次延迟队列的消息消费进度。

定时调度逻辑

  start方法启动后,会为每一个延迟级别创建一个调度任务,每一个延迟级别其实对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列。
  Step1:根据队列ID与延迟主题查找消息消费队列,如果未找到,说明目前并不存在该延时级别的消息,忽略本次任务。
  Step2:根据offset从消息消费队列中获取当前队列中所有有效的消息。
  Step3:遍历ConsumeQueue,解析出消息的物理偏移量、消息长度、消息tag hashcode, 为从commitlog加载具体的消息做准备。
  Step4:根据消息物理偏移量与消息大小从commitlog文件中查找消息。
  Step5:根据消息重新构建新的消息对象,清除消息的延迟级别属性(delayLevel)、并恢复消息原先的消息主题与消息消费队列。
  Step6:将消息再次存入到commitlog,并转发到主题对应的消息队列上,供消费者再次消费。
  Step7:更新延迟队列拉取进度。

总结

  定时消息的第一个设计关键点是,定时消息单独一个主题:SCHEDULE_TOPIC_XXXX,该主题下队列数量等于配置的延迟级别数量。其对应关系为queueId等于延迟级别减1。ScheduleMessageService为每一个延迟级别创建一个定时Timer根据延迟级别对应的延迟时间进行延迟调度。在消息发送时,如果消息的延迟级别delayLevel大于0,将消息的原主题名称、队列ID存入消息的属性中,然后改变消息的主题、队列与延迟主题与延迟主题所属队列,消息将最终转发到延迟队列的消费队列

  定时消息第二个设计关键点:消息存储时如果消息的延迟级别属性delayLevel大于0,则会备份原主题、原队列到消息属性中,通过为不同的延迟级别创建不同的调度任务,当时间到达后执行调度任务,调度任务主要就是根据延迟拉取消息消费进度从延迟队列中拉取消息,然后从commitlog中加载完整消息,清除延迟级别属性并恢复原先的主题、队列,再次创建一条新的消息存入到commitlog中并转发到消息消费队列供消息消费者消费。 

  1)消息发送者发送消息,如果发送的消息delayLevel大于0,则改变消息主题为SCHEDULE_TOPIC_XXXX,消息队列为delayLevel-1。
  2)消息经由Commitlog转发到消息消费队列SCHEDULE_TOPIC_XXXX的消息消费队列0。
  3)定时任务Timer每隔1秒根据上次拉取偏移量从消费队列中取出所有消息。
  4)根据消息的物理偏移量和消息大小从Commitlog中拉取消息。
  5)根据消息属性重新创建消息,并恢复原主题topic、原队列ID,清除delayLevel属性,存入Commitlog文件。
  6)转发到原主题topic的消息消费队列,供消息消费者消费。

 

 

二、顺序消息

  RocketMQ支持局部消息顺序消费,可以确保同一个消息消费队列中的消息被顺序消费,如果需要做到全局顺序消费则可以将主题配置成一个队列。

  要保证部分消息有序,需要发送端和消费端配合处理。在发送端,要做到把同一业务ID的消息发送到同一个Message Queue;在消费过程中,要做到从同一个Message Queue读取的消息不被并发处理,这样才能达到部分有序。

  根据并发消息消费的流程,消息消费包含如下4个步骤:消息队列负载、消息拉取、消息消费、消息消费进度存储。

消息队列负载 

  RocketMQ首先需要通过RebalanceService线程实现消息队列的负载,集群模式下同一个消费组内的消费者共同承担其订阅主题下消息队列的消费,同一个消息消费队列在同一时刻只会被消费组内一个消费者消费,一个消费者同一时刻可以分配多个消费队列。
  如果经过消息队列重新负载(分配)后,分配到新的消息队列时,首先需要尝试向Broker发起锁定该消息队列的请求,如果返回加锁成功则创建该消息队列的拉取任务,否则将跳过,等待其他消费者释放该消息队列的锁,然后在下一次队列重新负载时再尝试加锁。
  顺序消息消费与并发消息消费的第一个关键区别:顺序消息在创建消息队列拉取任务时需要在Broker服务器锁定该消息队列。

消息拉取

  RocketMQ消息拉取由PullMessageService线程负责,根据消息拉取任务循环拉取消息。如果消息处理队列未被锁定,则延迟3s后再将PullRequest对象放入到拉取任务中,如果该处理队列是第一次拉取任务,则首先计算拉取偏移量,然后向消息服务端拉取消息。

消息消费

  顺序消息消费的实现类:org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService

1、ConsumeMessageOrderlyService 构造方法
  初始化实例参数,这里的关键是消息任务队列为LinkedBlockingQueue,消息消费线程池最大运行时线程个数为consumeThreadMin。

2、ConsumeMessageOrderlyService 启动方法
  如果消费模式为集群模式,启动定时任务,默认每隔20s执行一次锁定分配给自己的消息消费队列。集群模式下顺序消息消费在创建拉取任务时并未将ProcessQueue的locked状态设置为true,在未锁定消息队列之前无法执行消息拉取任务,ConsumeMessageOrderlyService以每20s的频率对分配给自己的消息队列进行自动加锁操作,从而消费加锁成功的消息消费队列。
  Step1:向Broker(Master主节点)发送锁定消息队列,该方法返回成功被当前消费者锁定的消息消费队列。
  Step2:将成功锁定的消息消费队列相对应的处理队列设置为锁定状态,同时更新加锁时间。
  Step3:遍历当前处理队列中的消息消费队列,如果当前消费者不持有该消息队列的锁,将处理队列锁状态设置为false,暂停该消息消费队列的消息拉取与消息消费。

3、ConsumeMessageOrderlyService 提交消费任务
  构建消费任务 ConsumeRequest, 并提交到消费线程池中。顺序消息的ConsumeRequest消费任务不会直接消费本次拉取的消息,而是在消息消费时从处理队列中拉取。
  Step1:如果消息处理队列为丢弃,则停止本次消费任务。
  Step2:根据消息队列获取一个对象,然后在消息消费时先申请独占锁 objLock。顺序消息消费的并发度为消息队列。也就是一个消息消费队列同一时刻只会被一个消费线程池中一个线程消费。
  Step3:如果是广播模式,直接进入消费,无须锁定处理队列,因为相互之间无竞争;如果是集群模式,消息消费的前提条件是 processQueue 被锁定并且锁未超时。
  Step4:顺序消息消费处理逻辑,每一个 ConsumeRequest 消费任务不是以消费消息条数来计算的,而是根据消费时间,默认60s后,本次消费任务结束,由消费组内其他线程继续消费。
  Step5:每次从处理队列中按顺序取出 consumeBatchSize 消息, 如果未取到消息,则设置 continueConsume 为 false,本次消费结束;
  Step6:执行消息消费钩子函数(消息消费之前before方法)
  Step7:申请消息消费锁,如果消息队列被丢弃,放弃该消息消费队列的消费。然后执行消息消费监听器,调用业务方具体消息监听器执行真正的消息消费处理逻辑,并通知RocketMQ消息消费结果。
  Step8:执行消息消费钩子函数
  Step9:如果消息消费结果为SUCCESS,执行ProcessQueue的commit方法,并返回待更新的消息消费进度。提交,就是将该批消息从ProcessQueue中移除。
  检查消息的重试次数,如果消息重试次数大于或等于允许的最大重试次数,将该消息发送到Broker端,该消息在消息服务端最终会进入到DLQ(死信队列),也就是RocketMQ不会再次消费,需要人工干预。
  Step10:存储消息消费进度。

消息队列锁实现

  消费端通过使用MessageListenerOrderly类解决单MessageQueue 的消息被并发处理的问题,在MessageListenerOrderly实现中名,为每个Consumer Queue加个锁,消费每个消息前,需要先获得这个消息对应的Consumer Queue所对应的锁,这样保证了同一时间,同一Consumer Queue的消息不被并发消费,但不同Consumer Queue的消息可以并发处理。

总结

  并发消息消费指消费线程池中的线程可以并发地对同一个消息消费队列的消息进行消费,消费成功后,取出消息处理队列中最小的消息偏移量作为消息消费进度偏移量存在于消息消费进度存储文件中,集群模式消息进度存储在Broker,广播模式消息进度存储在消费者端。
  顺序消息消费一般使用集群模式,是指消息消费者内的线程池中的线程对消息消费队列只能串行消费。与并发消息消费最本质的区别是消费消息时必须成功锁定消息消费队列,在Broker端会存储消息消费队列的锁占用情况。

 

 

原文地址:https://www.cnblogs.com/zjxiang/p/15028892.html