RocketMQ 的消息传递机制及AOP

1,MQ中消息投递分为两种,一种是生产者往MQ Broker种投递,另一种是broker往消费者投递

 一个消息主题对应了多个消息队列,所以会产生两个问题,生成者应该把消息放入到哪个队列种,消费者应该从哪个消息队列中拉取消息。因为消息在系统之间传递的时候,跨越网络,消息的传播无法保证其有序

2,生产者投递消息的策略

  2.1:基于queue队列轮询算法,发布的信息有一个index,根据index对队列总数目取余。因为这个index是递增的

  2.2:基于queue队列轮询算法和消息投递延迟最小时间,MQ会统计消息投递的时间延迟,后面优先使用投递延迟时间最小的策略,当时间相同的时候,再使用队列轮询

  2.3: 当我们需要保证消息投递有序的时候,需要进行一些策略来限制,不然消费者消费的顺序发生错误的时候是不被允许的。解决办法:针对相同的订单号,通过一致性hash策略,将其放在同一个队列中;然后消费者再采用一定的策略(用一个线程处理一个queue来保证消息处理的顺序性)

    通过消息队列选择器这个接口来实现的,有以下三个类来实现这个接口

3,如何为消费者分配queue队列

RocketMQ对于消费者消费有两种形式,广播:一个消息会被每个消费者消费;集群:一个消息最多被一个消费者消费。

广播模式的话,消费者每次从每个队列中依次消费消息,

集群模式:底层是通过指定queue队列给消费者的方式来完成的。分配的单位是queue队列。Rocket MQ提供了一个接口AllocateMessageQueueStrategy,具体的实现类有下面这些。

   1,平均分配算法:取余依次按照余数进行分配,不是完全平均的

   2,基于环形平均算法:将消费者依次分配到队列的环形组成中

   3,基于机房临近原则

   4,基于机房分配算法

   5,基于一致性hash算法

   6,基于配置分配算法

默认情况下采用平均分配算法

4,RocketMQ添加监控和系统告警通知

场景来源:服务宕机,消费者下线,消息长时间or大量对接

解决办法:修改rocketMQ-console源码,主要是利用RocketMQ的mqadmin工具

原文地址:https://www.cnblogs.com/benbenzoule/p/13838521.html