Kafka:消费者

消费者和消费组

消费者负责订阅kafka中的主题,并且从订阅的主题上拉取消息。

kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递到订阅它的每个消费组中的一个消费者。

如图所示:

image-20210207171454004

某个主题共有4个分区:P0、P1、P2、P3。有两个消费组A和B都订阅了这个主题。消费组A中有4个消费者,消费组B中有两个消费者。按照Kafka的默认规则,最后的分配结果是A中的每个消费者分配到一个分区,B中每个消费者分配到两个分区,两个消费组之间互不影响,每个消费者只能消费到所分配的分区中的消息。换言之,每一个分区只能被一个消费组中的一个消费者所消费。

分区的分配:

如下图,消费组内只有1个消费者:

image-20210207172229520

组中新加入了一个消费者:按照逻辑,需要将原来消费C0的部分分区分配给消费者C1消费,如下图。

image-20210207172329297

消费者和消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少)消费者个数来提高(或降低)整体的消费能力。

以上分配逻辑都是基于默认的分区分配逻辑进行分析的,可以通过消费者客户端参数partition.assignment.stratyegy来设置消费者与订阅主题之间的分区分配策略。

客户端开发

消费者代码

    private static final String TOPIC = "wj";
    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        properties.load(new FileInputStream(ResourceUtils.getFile("classpath:kafka_consumer.properties")));
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(TOPIC));
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key());
                System.out.println(record.value());
            }
        }
    }
#key序列化器
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#值序列化器
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#集群地址
bootstrap.servers=192.168.1.51:9092
group.id=group.demo

参数配置

  • bootstrap.servers 指定连接kafka集群所需的broker地址清单。当设置两个以上broker信息时,其中任意一个宕机了,消费者仍然能连接到集群中。
  • group.id 消费者隶属消费组的名称,默认为""。如果为空,会报异常。一般来说,这个参数需要设置成有也业务意义的名称。
  • key.deserializer和value.deserializer 与生产端参数对应,需要反序列化将字节数组转换为原有对象格式
  • client.id 设定Consumer对应的客户端id,默认"",如果不设置,则Consumer会自动生成一个非空字符串,如consumer-1,consumer-2等。

订阅主题与分区

一个消费者可以订阅一个或多个主题。使用KafkaConsumer.subscibe()方法订阅一个主题。

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)

topics: 订阅的主题,支持正则表达式,如果连续订阅两次(调用两次),则以最后一次调用为准

listener:再均衡监听器,后面再说。

消费者不仅可以通过subscibe订阅主题,而且可以通过assign来订阅主题的特定分区。

public void assign(Collection<TopicPartition> partitions)

partitions 用来指定需要订阅的分区集合,TopicPartition类在Kafka客户端中用来表示分区。

如果我们事先不知道主题有多少个分区怎么版,kafkaConsumer中的partitionsFor()方法可以用来查询指定主题的元数据信息。

public List<PartitionInfo> partitionsFor(String topic)

以下是查询出的元数据信息:

[Partition(topic = wj, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = wj, partition = 1, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])]

leader:代表分区的leader副本所在位置

replicas:代表分区的AR集合

isr:代表分区的isr集合

offlineReplicas:代表分区OSR集合

通过partitionsFor方法,我们就可以通过assign方法来实现订阅主题的功能

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
List<TopicPartition> partitions = consumer.partitionsFor(TOPIC).stream()
    .map(p -> new TopicPartition(p.topic(), p.partition()))
    .collect(Collectors.toList());
consumer.assign(partitions);

取消订阅:

public void unsubscribe()
//以下两个方法只要传入的集合为null,那么也表示取消订阅
public void assign(Collection<TopicPartition> partitions)
public void subscribe(Collection<String> topics)

但是没有订阅任何主题或分区,就会抛出异常

消息消费

消息的消费一般有两种模式:拉模式和推模式。拉模式是消费者主动向服务端发起请求来拉取消息。推模式是服务端主动将消息推送给消费者。

kafka的消费是基于拉模式的。Kafka的消息消费是一个不断轮询的过程,消费者需要重复调用poll方法,返回值是ConsumerRecords,包装了ConsumerRecord。

ConsumerRecord:

public class ConsumerRecord<K, V> {
    private final String topic;//主题名称
    private final int partition;//分区编号
    private final long offset;//所属分区的偏移量
    private final long timestamp;//时间戳
    //两种类型:一种表示消息创建的时间戳,一种表示消息追加到日志的时间戳
    private final TimestampType timestampType;
    private final int serializedKeySize;//key序列化后的大小
    private final int serializedValueSize;//value序列化后的大小
    private final Headers headers;//消息头
    private final K key;
    private final V value;
    private volatile Long checksum;//CRC32的校验值

ConsumerRecords的方法:

image-20210208095838337

位移提交

对于kafka中的分区而言,每一条消息都有唯一的offset,用来表示消息在分区对应的位置。对于消费者而言,也有一个offset的概念,表示消费者消费到分区中某个消息所在的位置。

对于消息在分区中的位置,我们将offset称为"偏移量"。对于消费者消费到的位置,将offset称为"位移"。

每次调用poll方法时,它返回的是还没有被消费过的消息集(前提消息已经存储在kafka中),要做到这一点,就需要记录上一次消费时的消费位移,并且该消费位移也需要持久化保存,而不是保存在内存中。这里将消费位移存储起来(持久化)动作成为"提交",消费者在消费完消息之后需要执行消费位移的提交。

image-20210208102353721

x表示某一次拉取操作中次分区消息的最大偏移量,不过消费者需要提交的消费位移并不是x,而是x+1,它表示下一条需要拉取的消息的位置。

消费者中还有一个committed offset的概念,表示已经提交过的消费位移。

public long position(TopicPartition partition)//获取position的值
public OffsetAndMetadata committed(TopicPartition partition)//获取committed offset的值

消费位移演示:

    private static final String TOPIC = "wj";
    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        properties.load(new FileInputStream(ResourceUtils.getFile("classpath:kafka_consumer.properties")));
        TopicPartition tp = new TopicPartition(TOPIC, 0);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.assign(Collections.singletonList(tp));
        long lastConsumedOffset = -1;
        while(true){
            ConsumerRecords<String, String> records = consumer.poll(1000);
            if(records.isEmpty()) break;
            List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
            lastConsumedOffset = partitionRecords.get(partitionRecords.size() -1).offset();
            consumer.commitAsync();//同步提交消费位移
        }
        System.out.println("Consumer offset is "+ lastConsumedOffset);
        OffsetAndMetadata metadata = consumer.committed(tp);
        System.out.println("Consumer offset is "+ metadata.offset());
        long position = consumer.position(tp);
        System.out.println("the offset of next record is "+ position);
    }
#key序列化器
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#值序列化器
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#集群地址
bootstrap.servers=192.168.1.51:9092
group.id=group.demo

image-20210208105516670

自动提交

kafka中默认的消费位移的提交方式是自动自交,这个由消费者客户端参数enable.auto.commit配置,默认值是true。这个默认的自动提交是定期提交,这个定期周期时间由客户端参数auto.commit.interval.ms配置,默认5s。

默认方式下,消费者每隔5s会将拉取到每个分区中最大的消息位移进行提交。自动提交的动作在poll方法逻辑中而完成。

对于位移提交的具体时机把握也很有讲究。

image-20210208133048023

消息丢失:如上图,x+5表示正在处理的位置,如果拉取到了直接提交,即提交了x+8,如果在x+5时遇到异常,故障恢复后,我们重新拉取消息是从x+8开始的,那么x+6和x+7的消息就丢失了

重复消费:如果位移提交动作是在消费完所有消息才执行,那么x+5发生异常,故障恢复后,我们重新拉取的消息还是x+2,那么x+2到x+4之间的消息又重新消费了一遍。

手动提交

很多时候,并不是拉取到消息就算消费完成,而是要进行一系列业务处理(写入数据库、写入缓存等),等到所有的业务处理完成才能认为消息成功被消费,然后才手动位移提交。

手动提交,消费者客户端配置enable.auto.commit配置为false。

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交细分为同步提交和异步提交

public void commitAsync()
public void commitSync()

commitSync无参方法只能提交当前批次对应的position值。

对于commitSync的无参方法而言,它提交消费位移的频率、拉取批次消息、处理批次消息的频率是一样的,如果想要更细粒度的提交,可以使用commitSync的有参方法

//offsets参数用来提交指定分区的位移
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout)

例子:业务每消费一次提交一次位移

while(true){
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records) {
        long offset = record.offset();
        System.out.println(offset);
        TopicPartition partition = new TopicPartition(record.topic(), record.partition());
        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset+1)));
    }
}

image-20210208135926880

例子:按照分区粒度同步提交消费位移

上一种提交方式性能消耗较高,更多的是按照分区的粒度划分提交位移的界限

while(true){
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (TopicPartition topicPartition : records.partitions()) {

        List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
        for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
            System.out.println("do something...");
        }
        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
        consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(lastOffset+1)));
    }

}

异步提交:

commitAsync在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作,性能提高了。

三个重载方法:

image-20210208140854804

其中OffsetCommitCallback是异步提交的回调方法,会调用onComplete方法

while(true){
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (TopicPartition topicPartition : records.partitions()) {

        List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
        for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
            System.out.println("do something...");
        }
        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
        consumer.commitAsync((offsets, exception) -> {
            System.out.println(offsets);
        });
    }
}

image-20210208141038288

控制或关闭消费

KafkaConsumer中使用pause和resume来实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作

public void pause(Collection<TopicPartition> partitions)
public void resume(Collection<TopicPartition> partitions)

paused:返回被暂停的分区集合:

public Set<TopicPartition> paused()

wakeup

//该方法可以退出poll逻辑,并抛出WakeUpException异常,通过它我们可以跳出循环
public void wakeup()

跳出循环以后,我们需要调用close方法关闭资源

完整的消费逻辑:

    private volatile static AtomicBoolean isRunning = new AtomicBoolean(true);
    private static final String TOPIC = "wj";

    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        properties.load(new FileInputStream(ResourceUtils.getFile("classpath:kafka_consumer.properties")));
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        TopicPartition tp = new TopicPartition(TOPIC, 0);
        consumer.assign(Collections.singletonList(tp));
        try {
            while(isRunning.get()){
                //poll..
                //process the record
                //commit offset
            }
        }catch (WakeupException ignore){

        }catch (Exception e){

        }finally {
            consumer.close();
        }
    }

如果想要关闭消费逻辑,可以调用consumer.wakeup(),也可以调用isRunning.set(false)

指定位移消费

  • 新的消费组建立
  • 一个消费组的新的消费者订阅了一个新的主题
  • 当__comsumer_offsets主题内有关这个消费组的位移信息过期而被删除

上述三种情况,都会导致消费者查找不到所记录的消费位移,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,默认"latest",即从分区末尾消费消息,另一种配置是"earliest",即从0开始消费

image-20210208144126951

还有一种配置是"none",配置此值意味着查询不到消费位移的时候,直接抛出NoOffsetForPartitionException异常。如果有消费位移,那么不会有任何异常。

seek

消息的拉取是根据poll方法的,但是我们无法精确掌握其消费的起始位置,而seek方法正好提供该功能。

//partition 分区 offset 指定消费位移
public void seek(TopicPartition partition, long offset)

通过seek方法只能重置消费者分配到的分区的消费位置,而分区的分配是在poll方法的调用过程中实现的。所以在seek之前需要先调用一次poll。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC));
consumer.poll(1000);
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size()==0){
    consumer.poll(100);
    //获取到消费者所分配到的分区信息
    assignment = consumer.assignment();
}
for (TopicPartition topicPartition : assignment) {
    consumer.seek(topicPartition, 100);
}
while(true){
    ConsumerRecords<String, String> records = consumer.poll(1000);

    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
}

其他相关方法:(重载方法就不写上去了)

//获取指定分区的末尾消息位置
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
//获取指定分区的开头消息位置   
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) 
//获取指定时间戳时候的消息位置,通过此方法可以获取昨天当前时间的消费位移等
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
//直接从开头消费 
public void seekToBeginning(Collection<TopicPartition> partitions)
//直接从末尾消费 
public void seekToEnd(Collection<TopicPartition> partitions)

通过seek方法,导致位移越界怎么办?

此时,kafka会根据auto.offset.reset参数的默认值来使消费位移重置。

再均衡

再均衡是指分区的所属权从一个消费者转移到另一个消费者的行为,不过在再均衡期间,消费组内的消费者是无法读取消息的,消费组会不可用。

那么消费者消费完还没有来得及提交消费位移就发生了再均衡操作,这个分区分配给了另一个消费者,那么那个消息就会被重新消费一遍,发生重复消费情况。所以要尽量避免再均衡的发生。

通过再均衡监听器ConsumerRebalanceListener用来设定发生再均衡动作前后的一些准备或收尾工作。

public interface ConsumerRebalanceListener {

    void onPartitionsRevoked(Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Collection<TopicPartition> partitions);
}

onPartitionsRevoked:此方法会在再均衡开始之前和消费者停止读取消息之后被调用,通过此方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象发生。

onPartitionsAssigned:此方法会在重新分配分区之后和消费者开始读取消费者之前被调用

    private volatile static AtomicBoolean isRunning = new AtomicBoolean(true);
    private static final String TOPIC = "wj";

    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        properties.load(new FileInputStream(ResourceUtils.getFile("classpath:kafka_consumer.properties")));
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        HashMap<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
        consumer.subscribe(Collections.singletonList(TOPIC), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                consumer.commitSync(currentOffsets);
                currentOffsets.clear();
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            }
        });
        try {
            while(isRunning.get()){
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1));
                }
                consumer.commitAsync(currentOffsets,null);
            }
        }catch (WakeupException ignore){

        }catch (Exception e){

        }finally {
            consumer.close();
        }
    }

这里先将消费位移暂存到一个局部变量中,正常消费时通过commitAsync异步提交,发生再均衡后,会调用onPartitionsRevoked方法回调执行使用commitSync同步提交,并清空局部变量,以避免重复消费。

消费者拦截器

自定义消费者拦截器需要实现ConsumerInterceptor接口。

public interface ConsumerInterceptor<K, V> extends Configurable {
	//在poll方法返回之前调用该方法,通过此方法可以对records进行相关定制包括过滤ConsumerRecord
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
	//该方法会在提交完消费位移之后调用,可以记录跟踪所提交的位移信息
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

    public void close();
}

配置自定义拦截器:

properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "myInterceptor");

消费者拦截器也有拦截器链的概念(与生产者一样),这里不再赘述。

消费者参数

只讲解部分参数。

  • fetch.min.bytes:poll请求中能从kafka中拉取的最小数据量,默认1B。如果数据量小于这个值,那么就会一直等待,直到数据量满足这个参数的配置大小。
  • fetch.max.bytes:poll请求中能从kafka中拉取的最大数据量,默认50M
  • fatch.max.wait.ms:这个参数与fetch.min.bytes有关,如果kafka参考fetch.min.bytes要求,那么可能会一直阻塞而无法工作,那么该参数就是配置最长阻塞时间,默认值500ms。
  • max.partition.fetch.bytes:配置从每个分区里返回给consumer的最大数据量,默认值是1M
  • max.poll.records:配置消费者在一次poll中拉取的最大消息数,默认500条。
  • connections.max.idle.ms:指定多久之后关闭闲置连接
  • exclude.internal.topics:用来指定Kafka的内部主题是否可以向消费者公开,默认为true,表示只能使用subscribe(Collection)方式而不能使用subscribe(Pattern)方式来订阅主题,为false则没有该限制。
  • receive.buffer.bytes:设置Socket接收消息缓冲区的大小,默认值64KB,设置-1,则为系统默认值
  • send.buffer.bytes:设置Socket发送消息缓冲区的大小,默认128KB
  • request.timeout.ms:配置consumer等待请求响应的最长时间,默认30000ms
  • metadata.max.age.ms:配置元数据过期时间,默认300000ms,即5分钟
  • reconnect.backoff.ms:配置尝试重新连接指定主机之前的等待时间,默认50ms
  • retry.backoff.ms:配置尝试重新发送失败的请求到指定的主题分区之间的等待时间,默认100ms
  • isolation.level:配置消费者的事务隔离级别,字符串类型,有效值"read_uncommitted"(默认情况)和"read_commited",表示消费者所消费到的位置。
    • read_uncommitted:消费者忽略事务未提交的消息,即只能消费到LSO(LastStableOffset)的位置
    • read_commited:消费到High Watermark处位置
原文地址:https://www.cnblogs.com/wwjj4811/p/14389377.html