rocketMQ配置事故

公司的binlog消息通知,基于canal采集然后转发到rocketmq推送给业务进行消费。

基于此机制,为了实现实时计算通用源端处理,订阅了若干rocketmq的topic进行数据的幂等事务性投递到实时计算的消息队列。

订阅了之后,进程在线上运行几分钟之后就OOM了,于是按如下步骤分析原因:

  • jmap

jmap -dump:format=b,file=/home/xxxxxx/logs/01.hprof 6436

在系统运行过程中,观察内存进展,每隔一定程度,就提取一份dump文件,用于后续做连续分析。

  • jprofiler

用jprofiler打开这些dump文件,查看Biggest Objects,按Group by class分组聚合后发现大对象是ProcessQueue。

通过连续多个dump文件的分析,发现ProcessQueue对象的数量,在内存显著变化的过程中,没有明显变化,但是对象的大小有明显变化。

  • incoming分析

基于上述观察结果,分析聚合的引用ProcessQueue集合。结合rocketMQ输出的日志发觉,部分topic定义的读队列数量是512,配置了4个broker,那作为消费者,这些topic的定义导致消费端创建2048个ProcessQueue。基于这结合dump中的ProcessQueue实例数,可以得出ProcessQueue并没有创建失控,incoming端不是导致oom的原因。

  • outgoing分析

观察ProcessQueue的代码与ProcessQueue的聚合引用树可以看出来,是里面的消息有积压导致无法GC进而oom。

那基于这个分析,大概率是outgoing references端导致了oom。

  • 配置项分析

围绕ProcessQueue中的Treemap,寻找增加数据的入口发现,RocketMQ是基于DefaultMQPushConsumerImpl拉取的消息,pullMessage时围绕2个配置项进行拉取控制。

  • pullThresholdForQueue

当ProcessQueue内部的消息计数器大于该值时,将当前拉取请求放入等待队列中并滞后50ms。该参数通常用于在客户端识别自己的消费能力后,用于流控。系统的默认值是1000.

  • consumeConcurrentlyMaxSpan
单队列并行消费允许的最大跨度,也可以用于流控。

基于上述的分析,因为拉取缓存的消息过多,而消费能力并没有拉取的量级那么多,故导致消息积压,积压带来进程的oom。
  • 解决办法
限制消息本地缓存的量,避免积压过多。在配置的时候,声明pullThresholdForQueue为合适的值即可。
原文地址:https://www.cnblogs.com/asfeixue/p/11678045.html