Kafka 如何保证消息的消费顺序一致性

Kafka 如何保证消息的消费顺序?

在Kafka中Partition(分区)是真正保存消息的地方,发送的消息都存放在这里。Partition(分区)又存在于Topic(主题)中,并且一个Topic(主题)可以指定多个Partition(分区)。

在Kafka中,只保证Partition(分区)内有序,不保证Topic所有分区都是有序的。

所以 Kafka 要保证消息的消费顺序,可以有2种方法:

一、1个Topic(主题)只创建1个Partition(分区),这样生产者的所有数据都发送到了一个Partition(分区),保证了消息的消费顺序。

二、生产者在发送消息的时候指定要发送到哪个Partition(分区)。

我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。

(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;

(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值; 

 在Producer往Kafka插入数据时,控制同一Key分发到同一Partition,并且设置参数max.in.flight.requests.per.connection=1,也即同一个链接只能发送一条消息,如此便可严格保证Kafka消息的顺序

(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后

面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition

值,也就是常说的 round-robin 算法。

三、以下所有的分析都是基于同一个partition下的场景细化,
多partition下无法保障消息的顺序性,单一partition若碰到如下场景还是需要调整参数。

场景一:设置了retries>0,并且max.in.flight.requests.per.connection>1

1、retries

生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries参数的值决定了生产者可以重发消息的次数,
如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。

2、max.in.flight.requests.per.connection

该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。
把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。这种场景下无法保障单一partition的有序,一般来说要保障消息的有序性,对于消息的可靠性也是有要求的,
所以一般retries可以设置为大于0,但是max.in.flight.requests.per.connection设置为1即可,不过这样就有一个问题,导致了消息的吞吐量大大降低。


场景二:需要提升吞吐量max.in.flight.requests.per.connection设置大于1

此场景下业务要保障消息的吞吐量,那么max.in.flight.requests.per.connection必然就会选择更大的一个阈值,但是此场景还能保障消息有序性吗?答案是肯定的,
可以设置enable.idempotence=true,开启生产者的幂等生产,可以解决顺序性问题,并且允许max.in.flight.requests.per.connection设置大于1

四、参考:Kafka如何保证单partition有序?

1.producer发消息到队列时,通过加锁保证有序现在假设两个问题broker leader在给producer发送ack时,因网络原因超时,那么Producer将重试,造成消息重复。

先后两条消息发送。t1时刻msg1发送失败,msg2发送成功,t2时刻msg1重试后发送成功。造成乱序。

2.解决重试机制引起的消息乱序为实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。
对于每个PID,该Producer发送消息的每个<Topic, Partition>都对应一个单调递增的Sequence Number。
同样,Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其序号比Broker维护的序号)大一,
则Broker会接受它,否则将其丢弃:如果消息序号比Broker维护的序号差值比一大,说明中间有数据尚未写入,即乱序,此时Broker拒绝该消息,
Producer抛出InvalidSequenceNumber如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,
Producer抛出DuplicateSequenceNumberSender发送失败后会重试,这样可以保证每个消息都被发送到broker

参考:

https://www.zhihu.com/question/266390197

https://zhuanlan.zhihu.com/p/262156222

原文地址:https://www.cnblogs.com/-courage/p/15252760.html