kafka-consumer端的设计细节

在记录中,ConsumerA,B,C代表一个消费者,Group代表一个Consumer Group

Consumer,Group,Topic,Partition的关系

  • Topic逻辑的订阅者是Group,每个Consumer 进程都会划归到一个Group中
  • 一条消息可以被多个Group订阅,就像广播到每个Group,但是只会被这个Group下的一个Consumer实例消费到

1、Consumer和Group

Group与Consumer的关系是动态维护的:

当一个Consumer 进程挂掉 或者是卡住时,该consumer所订阅的partition会被重新分配到该group内的其它的consumer上

当一个consumer 加入一个group时,会从其它consumer中分配出一个或多个partition 给这个新加入的consumer

当启动一个Consumer时,会根据配置的group.id指定它要加入的group

为了维持Consumer 与 Group的联系,需要Consumer周期性的发送heartbeat到coordinator(协调者)

当Consumer由于某种原因不能发Heartbeat失联时,会认为该consumer已退出,它订阅的partition会分配到其它consumer(rebalance)

2、Consumer与Partition

具体的Consumer实例实际订阅的其实是topic下的一个或多个Partition

partition分配的工作在consumer leader中完成

3、Coordinator

group在server端由GroupCoordinator(组协调器)来管理部分组和该组下的每个消费者的消费偏移量

每个consumer都有一个ConsumerCoordinator,负责与GroupCoordinator保持通信(包括但不限于)

consumer在poll()和joinGroup()之前必须保证Coordinator状态连接正常

4、rebalance过程

一个Consumer要join到一个group中,或者一个consumer退出时,都会触发rebalance。大体经过这几步:

1)变动的consumer会带上自己的一些元数据信息,向对应的GroupCoordinator发起Join请求

2)Coordinator 可能会收到不止一个join请求,从组里选出一个leader(就是选队长),并通知给各个consumer(组员)

3)leader 根据其他consumer的metadata,为每个member重新分配partition。分配完毕通过coordinator把最新分配情况同步给每个consumer

4)Consumer拿到最新的分配后,继续工作

注意:所有的consumer要先向coordinator注册,由coordinator选出leader, 然后由leader来分配state。 由leader来执行协调任务, 这样把负载分

配到client端,可以减轻broker的压力,支持更多数量的消费组,leader分配完后将结果发回组协调器,组协调器同步结果给各member

Consumer消费过程

 借图,这个是讲清了从消费者实例启动到抓取数据的整个过程,涉及到KafkaConsumer和Broker,GroupCoordinator进行确认,入组,心跳,抓取数据的过程

poll()

调用poll()时,consumer发起fetch请求,像partition拉取数据,拉取多少取决于配置的max.partition.fetch.bytes和max.poll.records来配置决定

可能出现的问题:consumer 进程一直在周期性的发送heartbeat,但是一直不消费消息(不调用poll()拉取消息),这种状态称为livelock

我取个名字叫"占着茅坑不拉屎"(不管是什么原因导致),这时候占着的茅坑(partition)没法被正常消费,但是也没法让出来给别的consumer

kafka为你想到了,使用max.poll.interval.ms这个配置来检测,如果蹲大厕不拉屎的时间超过了这个设定时间,会向GroupCoordinator发送一个

leaveGroup的通知,接着会触发rebalance,然后下一次poll()的时候重新发送joinGroup的请求,这启示我们如果批次消息处理耗时较长,这个检测

时间(max.poll.interval.ms)应该调大一点点,至少要大于你的处理消息的时间,换一种角度,解决方案是降低处理消息时间,要么优化业务逻辑,

要么通过max.poll.records设置减小拉取的条数

commit offset

前面辨析了,同一个group下,message不会被组员重复消费,不会漏消费,比如刚刚有一个批次的消息,consumerA消费完之后挂了,或者还没消费完就挂了,

rebalance后其他的consumer怎么知道从哪里开始接着消费呢?

跟了一遍源码,每个Consumer都有一个ConsumerCoordinator,在这个协调器中保存了一个变量叫SubscriptionState,当调用commit offset的请求时,会

将这个变量保存的信息一起提交,保存的信息,正是这个组对应的topic下,每个partition消息的位移

注意:决定消息什么时候被消费,控制权在消费者端

重要的配置

bootstrap.servers

consumer端配置的kafka集群的地址,是一个ip:port的list,逗号隔开,可以只填一个,kafka会自动发现集群里

其他的broker,但是如果配置的这个broker正好挂了,那就不行了,多多益善

group.id

重要属性,消费者必须处于一个消费者组里面,如果消费过程中,更改了groupId,会导致重新消费

heartbeat.interval.ms

心跳用于确保消费者ConsumerCoordinator 和协调者GroupCoordinator会话保持活动状态,当消费者加入或离开组时,方便broker端进行rebalance,

该值必须比session.timeout.ms小,通常不高于1/3。源码中位于AbstracCoordinator这个类下,定义了一个内部类:

private class HeartbeatThread extends Thread {}
HeartbeatThread() {
super("kafka-coordinator-heartbeat-thread" + (AbstractCoordinator.this.groupId.isEmpty() ? "" : " | " + AbstractCoordinator.this.groupId));
this.setDaemon(true);
}

构造方法中设置该线程为一个守护线程,因此对消费者来说,心跳是无感的,消费者实例已启动,心跳线程就开始工作

在定义的run方法中,检查了各项参数OK之后,会调用AbstractCoordinator.this.heartbeat.sentHeartbeat(now)来定频率的发送心跳

Heartbeat的结构如下,构造方法中确定heartbeatInterval必须小于sessionTimeout

public final class Heartbeat {
    private final long sessionTimeout;
    private final long heartbeatInterval;
    private final long maxPollInterval;
    private final long retryBackoffMs;
    private volatile long lastHeartbeatSend;
    private long lastHeartbeatReceive;
    private long lastSessionReset;
    private long lastPoll;
    private boolean heartbeatFailed;

    public Heartbeat(long sessionTimeout, long heartbeatInterval, long maxPollInterval, long retryBackoffMs) {
        if (heartbeatInterval >= sessionTimeout) {
            throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
        } else {
            this.sessionTimeout = sessionTimeout;
            this.heartbeatInterval = heartbeatInterval;
            this.maxPollInterval = maxPollInterval;
            this.retryBackoffMs = retryBackoffMs;
        }
    }
}

这个地方贴一个重点,Heartbeat中会判断两个超时来决定消费者是否"脱离了组织"

public boolean sessionTimeoutExpired(long now) {
    return now - Math.max(this.lastSessionReset, this.lastHeartbeatReceive) > this.sessionTimeout;
}
public boolean pollTimeoutExpired(long now) { return now - this.lastPoll > this.maxPollInterval; }

每次心跳会保存上一次发送心跳请求的最后时刻点,然后比较心跳有没有断开,因为sessionTimeout是单独的一个设计点,

可能和心跳的过程重合,所以取的是两者最近的时刻作为最后一次确认是连接状态的时间点

run()中对超时处理是这么定义的:session超时会判定消费者挂了,将会被踢下线,然后组进行rebalance;poll超时会离开group,之所以是maybeLeave,

是因为下次poll的时候可能再重新入组,因为这个poll是KafkaConsumer主动调用的,所以如果上一次poll没拉回消息,也不要"休息"太久,

这样会平凡源码如下,方法名很容易读:

else if (AbstractCoordinator.this.heartbeat.sessionTimeoutExpired(now)) {
    AbstractCoordinator.this.coordinatorDead();
} else if (AbstractCoordinator.this.heartbeat.pollTimeoutExpired(now)) {
    AbstractCoordinator.this.maybeLeaveGroup();
} 

 enable.auto.commit

是否自动提交offset,默认为true,只能保证at least once,通常大部分业务都要设置为false,业务手动提交

原文地址:https://www.cnblogs.com/yb38156/p/14590350.html