rocketmq-顺序消息

参考:

https://blog.csdn.net/zhaoming19870124/article/details/90900808

https://blog.csdn.net/hosaos/article/details/90675978

https://www.cnblogs.com/hzmark/p/orderly_message.html

RocketMQ局部顺序消息实现原理

顺序消息:是指消息的消费顺序与消息的产生顺序相同;顺序消息分为全局顺序消息和局部顺序消息,全局顺序消息是指:在某个topic下的所有消息都要保证消费顺序与产生顺序相同;部分顺序消息是指:只要保证每一组消息被顺序消费即可。在RocketMQ中,若要实现全局顺序消息,首先把topic的读写队列设置为一,然后把生产者producer和消费者consumer都设置成单线程即可。但这样一来,就需要牺牲高并发和高吞吐量了。一般情况下,根据业务的需要,我们只需要实现局部顺序消息即可。

在高并发情况下,RocketMQ实现局部顺序消息是通过消息的生产者和消息的消费者协同完成的。发送端需要做的事情:把同一个小组内的消息发送到指定的队列Message Queue中;消费端需要做的事情:仅用一个线程处理这个队列中的消息。

默认情况下,消息的生产端实现负载均衡的做法是:轮流向各个消息队列Message Queue中发送消息。消息的消费端实现负载均衡的做法是:把消息队列的总数简单的除以消费者的个数,每个消费者负责一些消息队列(注意:消费者的数量不要超过消息队列的个数,否则多余的消费者接收不到消息)。在我们人为不干涉的情况下,把一条消息投递到哪个队列以及被哪个消费者下的线程消费都是未知的。

为了实现局部顺序消息的消费,发送端通过使用MessageQueueSelector类来控制把消息发往哪个消息队列Message Queue中,其代码如下:

SendResult result = null;
try {
    result = producer.send(message, new MessageQueueSelector() {
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Integer queueNumber = (Integer)arg;
            return mqs.get(queueNumber);
        }
    }, 2);
} catch (MQClientException e) {
    e.printStackTrace();
} catch (RemotingException e) {
    e.printStackTrace();
} catch (MQBrokerException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println(result);

在我们初始化消费者时,需要指定监听器的类型:

MessageListenerOrderly:在消息需要按局部顺序消费时使用;

MessageListenerConcurrently:在消息不需要按局部顺序消费时使用。

在MessageListenerOrderly的实现中,为每个Consumer Queue加个锁,消费每个消息前,需要先获得这个消息所在的Consumer Queue所对应的的锁,这样就可以保证在同一时间、同一个Consumer Queue的消息不被并发消费,但不同的Consumer Queue的消息可以并发处理。

为了实现局部顺序消息的消费,消息的消费端需要指定监听器类型为:MessageListenerOrderly,代码如下:

this.consumer.setMessageListener(new MessageListenerOrderly() {
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        try {
            //处理业务逻辑
            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Exception e) {
            e.printStackTrace();
            //当消费消息的过程中,若是出现了异常,则稍后再重新消费
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
});

RocketMQ-顺序消息Demo及实现原理分析

场景分析

顺序消费是指消息的产生顺序和消费顺序相同

假设有个下单场景,每个阶段需要发邮件通知用户订单状态变化。用户付款完成时系统给用户发送订单已付款邮件,订单已发货时给用户发送订单已发货邮件,订单完成时给用户发送订单已完成邮件。

发送邮件的操作为了不阻塞订单主流程,可以通过mq消息来解耦,下游邮件服务器收到mq消息后发送具体邮件,已付款邮件、已发货邮件、订单已完成邮件这三个消息,下游的邮件服务器需要顺序消费这3个消息并且顺序发送邮件才有意义。否则就会出现已发货邮件先发出,已付款邮件后发出的情况。

但是mq消费者往往是集群部署,一个消费组内存在多个消费者,同一个消费者内部,也可能存在多个消费线程并行消费,如何在消费者集群环境中,如何保证邮件mq消息发送与消费的顺序性呢?

顺序消费又分两种,全局顺序消费和局部顺序消费

全局顺序消费

什么是全局顺序消费?所有发到mq的消息都被顺序消费,类似数据库中的binlog,需要严格保证全局操作的顺序性

那么RocketMQ中如何做才能保证全局顺序消费呢?

这就需要设置topic下读写队列数量为1

为什么要设置读写队列数量为1呢?
假设读写队列有多个,消息就会存储在多个队列中,消费者负载时可能会分配到多个消费队列同时进行消费,多队列并发消费时,无法保证消息消费顺序性

那么全局顺序消费有必要么?
A、B都下了单,B用户订单的邮件先发送,A的后发送,不行么?其实,大多数场景下,mq下只需要保证局部消息顺序即可,即A的付款消息先于A的发货消息即可,A的消息和B的消息可以打乱,这样系统的吞吐量会更好,将队列数量置为1,极大的降低了系统的吞吐量,不符合mq的设计初衷

举个例子来说明局部顺序消费。假设订单A的消息为A1,A2,A3,发送顺序也如此。订单B的消息为B1,B2,B3,A订单消息先发送,B订单消息后发送

消费顺序如下
A1,A2,A3,B1,B2,B3是全局顺序消息,严重降低了系统的并发度
A1,B1,A2,A3,B2,B3是局部顺序消息,可以被接受
A2,B1,A1,B2,A3,B3不可接收,因为A2出现在了A1的前面

局部顺序消费

那么在RocketMQ里局部顺序消息又是如何怎么实现的呢?

要保证消息的顺序消费,有三个关键点

  1. 消息顺序发送
  2. 消息顺序存储
  3. 消息顺序消费

第一点,消息顺序发送,多线程发送的消息无法保证有序性,因此,需要业务方在发送时,针对同一个业务编号(如同一笔订单)的消息需要保证在一个线程内顺序发送,在上一个消息发送成功后,在进行下一个消息的发送。对应到mq中,消息发送方法就得使用同步发送,异步发送无法保证顺序性

第二点,消息顺序存储,mq的topic下会存在多个queue,要保证消息的顺序存储,同一个业务编号的消息需要被发送到一个queue中。对应到mq中,需要使用MessageQueueSelector来选择要发送的queue,即对业务编号进行hash,然后根据队列数量对hash值取余,将消息发送到一个queue中

第三点,消息顺序消费,要保证消息顺序消费,同一个queue就只能被一个消费者所消费,因此对broker中消费队列加锁是无法避免的。同一时刻,一个消费队列只能被一个消费者消费,消费者内部,也只能有一个消费线程来消费该队列。即,同一时刻,一个消费队列只能被一个消费者中的一个线程消费

上面第一、第二点中提到,要保证消息顺序发送和消息顺序存储需要使用mq的同步发送和MessageQueueSelector来保证,具体Demo会有体现

至于第三点中的加锁操作会结合源码来具体分析

Demo

producer中模拟了两个线程,并发顺序发送100个消息的情况,发送的消息中,key为消息发送编号i,消息body为orderId,大家注意下MessageQueueSelector的使用

consumer的demo有两个,第一个为正常集群消费的consumer,另外一个是顺序消费的consumer,从结果中观察消息消费顺序

理想情况下消息顺序消费的结果应该是,同一个orderId下的消息的编号i值应该顺序递增,但是不同orderId之间的消费可以并行,即局部有序即可

Producer Demo

public class Producer {
    public static void main(String[] args)  {
        try {
            MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            ((DefaultMQProducer) producer).setNamesrvAddr("111.231.110.149:9876");
            producer.start();
			
			//顺序发送100条编号为0到99的,orderId为1 的消息
            new Thread(() -> {
                Integer orderId = 1;
                sendMessage(producer, orderId);
            }).start();
			//顺序发送100条编号为0到99的,orderId为2 的消息
            new Thread(() -> {
                Integer orderId = 2;
                sendMessage(producer, orderId);
            }).start();
			//sleep 30秒让消息都发送成功再关闭
            Thread.sleep(1000*30);

            producer.shutdown();
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void sendMessage(MQProducer producer, Integer orderId) {
        for (int i = 0; i < 100; i++) {
            try {
                Message msg =
                        new Message("TopicTestjjj", "TagA", i + "",
                                (orderId + "").getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);
                System.out.println("message send,orderId:"+orderId);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}


Normal Consumer Demo

模拟了一个消费者中多线程并行消费消息的情况,使用的消费监听器为MessageListenerConcurrently

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        consumer.setNamesrvAddr("111.231.110.149:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTestjjj", "*");
        //单个消费者中多线程并行消费
        consumer.setConsumeThreadMin(3);
        consumer.setConsumeThreadMin(6);

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
//                    System.out.println("收到消息," + new String(msg.getBody()));
                    System.out.println("queueId:"+msg.getQueueId()+",orderId:"+new String(msg.getBody())+",i:"+msg.getKeys());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}


看下结果输出,如图,同一个orderId下,编号为10的消息先于编号为9的消息被消费,不是正确的顺序消费,即普通的并行消息消费,无法保证消息消费的顺序性

在这里插入图片描述

Order Consumer Demo

顺序消费的消费者例子如下,使用的监听器是MessageListenerOrderly

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("111.231.110.149:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTestjjj", "TagA");

        //消费者并行消费
        consumer.setConsumeThreadMin(3);
        consumer.setConsumeThreadMin(6);

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
//                context.setAutoCommit(false);
                for (MessageExt msg : msgs) {
                    System.out.println("queueId:"+msg.getQueueId()+",orderId:"+new String(msg.getBody())+",i:"+msg.getKeys());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

}


结果如下,同一个orderId下,消息顺序消费,不同orderId并行消费,符合预期
在这里插入图片描述

源码分析

在源码分析之前,先来思考下几个问题

前面已经提到实现消息顺序消费的关键点有三个,其中前两点已经明确了解决思路

第一点,消息顺序顺序发送,可以由业务方在单线程使用同步发送消息的方式来保证
第二点,消息顺序存储,可以由业务方将同一个业务编号的消息发送到一个队列中来实现

还剩下第三点,消息顺序消费,实现消息顺序消费的关键点又是什么呢?

举个例子,假设业务方针对某个订单发送了N个顺序消息,这N个消息都发送到了mq服务端的一个队列中,假设消费者集群中有3个消费者,每个消费者中又是开了N个线程多线程消费

第一种情形,假设3个消费者同时拉取一个队列的消息进行消费,结果会怎么样?N个消息可能会分配在3个消费者中进行消费,多机并行的情况下,消费能力的不同,无法保证这N个消息被顺序消费,所以得保证一个消费队列同一个时刻只能被一个消费者消费

假设又已经保证了一个队列同一个时刻只能被一个消费者消费,那就能保证顺序消费了?同一个消费者多线程进行消费,同样会使得的N个消费被分配到N个线程中,一样无法保证消息顺序消费,所以还得保证一个队列同一个时刻只能被一个消费者中一个线程消费

下面顺序消息的源码分析中就针对这两点来进行分析,即

  1. 如何保证一个队列只被一个消费者消费
  2. 如何保证一个消费者中只有一个线程能进行消费

锁定MessageQueue

先看第一个问题,如何保证一个队列只被一个消费者消费。

消费队列存在于broker端,如果想保证一个队列被一个消费者消费,那么消费者在进行消息拉取消费时就必须想mq服务器申请队列锁,消费者申请队列锁的代码存在于RebalanceService消息队列负载的实现代码中

先明确一点,同一个消费组中的消费者共同承担topic下所有消费者队列的消费,因此每个消费者需要定时重新负载并分配其对应的消费队列,具体为消费者分配消费队列的代码实现在RebalanceImpl#rebalanceByTopic中,本文不多讲

客户端实现

消费者重新负载,并且分配完消费队列后,需要向mq服务器发起消息拉取请求,代码实现在RebalanceImpl#updateProcessQueueTableInRebalance中,针对顺序消息的消息拉取,mq做了如下判断

在这里插入图片描述
核心思想就是,消费客户端先向broker端发起对messageQueue的加锁请求,只有加锁成功时才创建pullRequest进行消息拉取,下面看下lock加锁请求方法
在这里插入图片描述
代码实现逻辑比较清晰,就是调用lockBatchMQ方法发送了一个加锁请求,那么broker端收到加锁请求后的处理逻辑又是怎么样?

broker端实现

broker端收到加锁请求的处理逻辑在RebalanceLockManager#tryLockBatch方法中,RebalanceLockManager中关键属性如下

//默认锁过期时间 60秒
    private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
        "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
 //重入锁
    private final Lock lock = new ReentrantLock();
 //key为消费者组名称,value是一个key为MessageQueue,value为LockEntry的map
    private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
        new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);


LockEntry对象中关键属性如下

//消费者id
private String clientId;
//最后加锁时间
private volatile long lastUpdateTimestamp = System.currentTimeMillis();


isLocked方法如下
public boolean isLocked(final String clientId) {
            boolean eq = this.clientId.equals(clientId);
            return eq && !this.isExpired();
        }

        public boolean isExpired() {
            boolean expired =
                (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;

            return expired;
        }


对messageQueue进行加锁的关键逻辑如下:

如果messageQueue对应的lockEntry为空,标志队列未加锁,返回加锁成功
在这里插入图片描述
如果lockEntry对应clientId为自己并且没过期,标志同一个客户端重复加锁,返回加锁成功(可重入)
在这里插入图片描述

如果锁已经过期,返回加锁成功
在这里插入图片描述

总而言之,broker端通过对ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable的维护来达到messageQueue加锁的目的,使得同一时刻,一个messageQueue只能被一个消费者消费

synchronized申请线程独占锁

假设消费者对messageQueue的加锁已经成功,那么就进入到了第二个步骤,创建pullRequest进行消息拉取,消息拉取部分的代码实现在PullMessageService中,消息拉取完后,需要提交到ConsumeMessageService中进行消费,顺序消费的实现为ConsumeMessageOrderlyService,提交消息进行消费的方法为ConsumeMessageOrderlyService#submitConsumeRequest,具体实现如下
在这里插入图片描述
可以看到,构建了一个ConsumeRequest对象,并提交给了ThreadPoolExecutor来并行消费,看下顺序消费的ConsumeRequest的run方法实现

在这里插入图片描述
里面先从messageQueueLock中获取了messageQueue对应的一个锁对象,看下messageQueueLock的实现

在这里插入图片描述
其中维护了一个ConcurrentMap<MessageQueue, Object> mqLockTable,使得一个messageQueue对应一个锁对象object

获取到锁对象后,使用synchronized尝试申请线程级独占锁

  1. 如果加锁成功,同一时刻只有一个线程进行消息消费
  2. 如果加锁失败,会延迟100ms重新尝试向broker端申请锁定messageQueue,锁定成功后重新提交消费请求

至此,第三个关键点的解决思路也清晰了,基本上就两个步骤

  1. 创建消息拉取任务时,消息客户端向broker端申请锁定MessageQueue,使得一个MessageQueue同一个时刻只能被一个消费客户端消费
  2. 消息消费时,多线程针对同一个消息队列的消费先尝试使用synchronized申请独占锁,加锁成功才能进行消费,使得一个MessageQueue同一个时刻只能被一个消费客户端中一个线程消费

顺序消息重试机制

在使用顺序消息时,一定要注意其异常情况的出现,对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 版会自动不断地进行消息重试(每次间隔时间为 1 秒),重试最大值是Integer.MAX_VALUE.这时,应用会出现消息消费被阻塞的情况。因此,建议您使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生

重要的事再强调一次:在使用顺序消息时,一定要注意其异常情况的出现!

聊一聊顺序消息(RocketMQ顺序消息的实现机制)

当我们说顺序时,我们在说什么?

日常思维中,顺序大部分情况会和时间关联起来,即时间的先后表示事件的顺序关系。

比如事件A发生在下午3点一刻,而事件B发生在下午4点,那么我们认为事件A发生在事件B之前,他们的顺序关系为先A后B。

上面的例子之所以成立是因为他们有相同的参考系,即他们的时间是对应的同一个物理时钟的时间。如果A发生的时间是北京时间,而B依赖的时间是东京时间,那么先A后B的顺序关系还成立吗?

如果没有一个绝对的时间参考,那么A和B之间还有顺序吗,或者说怎么断定A和B的顺序?

显而易见的,如果A、B两个事件之间如果是有因果关系的,那么A一定发生在B之前(前因后果,有因才有果)。相反,在没有一个绝对的时间的参考的情况下,若A、B之间没有因果关系,那么A、B之间就没有顺序关系。

那么,我们在说顺序时,其实说的是:

  • 有绝对时间参考的情况下,事件的发生时间的关系;

  • 和没有时间参考下的,一种由因果关系推断出来的happening before的关系;

在分布式环境中讨论顺序

当把顺序放到分布式环境(多线程、多进程都可以认为是一个分布式的环境)中去讨论时:

  • 同一线程上的事件顺序是确定的,可以认为他们有相同的时间作为参考

  • 不同线程间的顺序只能通过因果关系去推断

(点表示事件,波浪线箭头表示事件间的消息)

上图中,进程P中的事件顺序为p1->p2->p3->p4(时间推断)。而因为p1给进程Q的q2发了消息,那么p1一定在q2之前(因果推断)。但是无法确定p1和q1之间的顺序关系。

推荐阅读《Time, Clocks, and the Ordering of Events in a Distributed System》,会透彻的分析分布式系统中的顺序问题。

消息中间件中的顺序消息

什么是顺序消息

有了上述的基础之后,我们回到本篇文章的主题中,聊一聊消息中间件中的顺序消息。

顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。

顺序消息包含两种类型:

分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费

全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费

这是阿里云上对顺序消息的定义,把顺序消息拆分成了顺序发布和顺序消费。那么多线程中发送消息算不算顺序发布?

如上一部分介绍的,多线程中若没有因果关系则没有顺序。那么用户在多线程中去发消息就意味着用户不关心那些在不同线程中被发送的消息的顺序。即多线程发送的消息,不同线程间的消息不是顺序发布的,同一线程的消息是顺序发布的。这是需要用户自己去保障的。

而对于顺序消费,则需要保证哪些来自同一个发送线程的消息在消费时是按照相同的顺序被处理的(为什么不说他们应该在一个线程中被消费呢?)。

全局顺序其实是分区顺序的一个特例,即使Topic只有一个分区(以下不在讨论全局顺序,因为全局顺序将面临性能的问题,而且绝大多数场景都不需要全局顺序)。

如何保证顺序

在MQ的模型中,顺序需要由3个阶段去保障:

  1. 消息被发送时保持顺序

  2. 消息被存储时保持和发送的顺序一致

  3. 消息被消费时保持和存储的顺序一致

发送时保持顺序意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。存储保持和发送的顺序一致则要求在同一线程中被发送出来的消息A和B,存储时在空间上A一定在B之前。而消费保持和存储一致则要求消息A、B到达Consumer之后必须按照先A后B的顺序被处理。

如下图所示:

对于两个订单的消息的原始数据:a1、b1、b2、a2、a3、b3(绝对时间下发生的顺序):

  • 在发送时,a订单的消息需要保持a1、a2、a3的顺序,b订单的消息也相同,但是a、b订单之间的消息没有顺序关系,这意味着a、b订单的消息可以在不同的线程中被发送出去

  • 在存储时,需要分别保证a、b订单的消息的顺序,但是a、b订单之间的消息的顺序可以不保证

    • a1、b1、b2、a2、a3、b3是可以接受的

    • a1、a2、b1、b2、a3、b3也是可以接受的

    • a1、a3、b1、b2、a2、b3是不能接受的

  • 消费时保证顺序的简单方式就是“什么都不做”,不对收到的消息的顺序进行调整,即只要一个分区的消息只由一个线程处理即可;当然,如果a、b在一个分区中,在收到消息后也可以将他们拆分到不同线程中处理,不过要权衡一下收益

开源RocketMQ中顺序的实现

上图是RocketMQ顺序消息原理的介绍,将不同订单的消息路由到不同的分区中。文档只是给出了Producer顺序的处理,Consumer消费时通过一个分区只能有一个线程消费的方式来保证消息顺序,具体实现如下。

Producer端

Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的选择。

  • List<MessageQueue> mqs:消息要发送的Topic下所有的分区

  • Message msg:消息对象

  • 额外的参数:用户可以传递自己的参数

比如如下实现就可以保证相同的订单的消息被路由到相同的分区:

long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());

Consumer端

RocketMQ消费端有两种类型:MQPullConsumer和MQPushConsumer。

MQPullConsumer由用户控制线程,主动从服务端获取消息,每次获取到的是一个MessageQueue中的消息。PullResult中的List msgFoundList自然和存储顺序一致,用户需要再拿到这批消息后自己保证消费的顺序。

对于PushConsumer,由用户注册MessageListener来消费消息,在客户端中需要保证调用MessageListener时消息的顺序性。RocketMQ中的实现如下:

  1. PullMessageService单线程的从Broker获取消息

  2. PullMessageService将消息添加到ProcessQueue中(ProcessMessage是一个消息的缓存),之后提交一个消费任务到ConsumeMessageOrderService

  3. ConsumeMessageOrderService多线程执行,每个线程在消费消息时需要拿到MessageQueue的锁

  4. 拿到锁之后从ProcessQueue中获取消息

保证消费顺序的核心思想是:

  • 获取到消息后添加到ProcessQueue中,单线程执行,所以ProcessQueue中的消息是顺序的

  • 提交的消费任务时提交的是“对某个MQ进行一次消费”,这次消费请求是从ProcessQueue中获取消息消费,所以也是顺序的(无论哪个线程获取到锁,都是按照ProcessQueue中消息的顺序进行消费)

顺序和异常的关系

顺序消息需要Producer和Consumer都保证顺序。Producer需要保证消息被路由到正确的分区,消息需要保证每个分区的数据只有一个线程消息,那么就会有一些缺陷

  • 发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试

  • 因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大

  • 消费的并行读依赖于分区数量

  • 消费失败时无法跳过

不能更换MessageQueue重试就需要MessageQueue有自己的副本,通过Raft、Paxos之类的算法保证有可用的副本,或者通过其他高可用的存储设备来存储MessageQueue。

热点问题好像没有什么好的解决办法,只能通过拆分MessageQueue和优化路由方法来尽量均衡的将消息分配到不同的MessageQueue。

消费并行度理论上不会有太大问题,因为MessageQueue的数量可以调整。

消费失败的无法跳过是不可避免的,因为跳过可能导致后续的数据处理都是错误的。不过可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。

原文地址:https://www.cnblogs.com/xuwc/p/14090564.html