【Kafka】Consumer API

Consumer API


Kafka官网文档给了基本格式

http://kafka.apachecn.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

JavaAPI 模板

自动提交offset

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

手动提交offset

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }
    if (buffer.size() >= minBatchSize) {
        insertIntoDb(buffer);
        consumer.commitSync();
        buffer.clear();
    }
}

自定义 自动提交offset

在这之前需要明白一点,自动提交是有可能造成重复消费的

比如我们设置的props.put("auto.commit.interval.ms", "1000");——提交offset值的时间间隔为1s
现在有这么几条数据等待消费


157 hello offset

287 hello world
295 abc test 900ms
351 hello abc 1000ms


157 hello offset 为这一次提交offset值的起点,351 hello abc 为提交offset值的重点
295 abc test 是到900ms的时候提交的offset,如果在此时发生了宕机,重新开始就会从157 hello offset再次进行消费,就会造成重复消费的情况

package cn.itcast.kafka.demo2;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        //指定Kafka服务器地址
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //指定消费者组的名字
        props.put("group.id", "testGroup");
        //允许程序自动提交offset,保存到kafka当中的一个topic中去
        props.put("enable.auto.commit", "true");
        //每隔多长时间提交一次offset的值
        props.put("auto.commit.interval.ms", "1000");
        //数据key和value的序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //定义KafkaConsumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅topic:test,并消费其中的数据
        consumer.subscribe(Arrays.asList("test"));
        //死循环拉取数据
        while (true) {
            //所有拉取到的数据都封装在了ConsumerRecords
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                int partition = record.partition();
                String value = record.value();
                long offset = record.offset();
                String key = record.key();
                System.out.printf("数据的key为" + key + ",数据的value为" + value + ",数据的offset为" + offset + ",数据的分区为" + partition);
            }

        }
    }
}

自定义 手动提交offset
package cn.itcast.kafka.demo2;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class ManualOffsetCommit {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        props.put("group.id", "testGroup2");
        //关闭自动提交offset值,改为手动提交
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                int partition = record.partition();
                String value = record.value();
                long offset = record.offset();
                String key = record.key();
                System.out.printf("数据的key为" + key + ",数据的value为" + value + ",数据的offset为" + offset + ",数据的分区为" + partition);
            }
                // ConsumerRecords 里面的数据消费完后,需要提交offset值
                // 使用异步提交的方法,不会阻塞程序的消费
                // consumer.commitSync();
                // 同步提交
                 consumer.commitSync();
        }
    }
}

消费完每个分区后手动提交offset
package cn.itcast.kafka.demo2;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class CommitPartition {
    public static void main(String[] args) {
        Properties props = new Properties();
        //指定kafka服务器地址
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //指定消费者组的名字
        props.put("group.id", "testGroup4");
        //关闭自动提交offset值,改为手动提交
        props.put("enable.auto.commit", "false");
        //数据key和value的序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //定义kafkaConsumer
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //订阅topic:test 并消费其中的数据
        kafkaConsumer.subscribe(Arrays.asList("test"));
        //调用poll方法,获取所有的数据,包含各个分区的数据
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(3000);
        //获取topic中所有分区
        Set<TopicPartition> partitions = consumerRecords.partitions();
        //循环消费数据
        for (TopicPartition topicPartition : partitions) {
            //获取一个分区立面的所有数据
            List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);
            for (ConsumerRecord<String, String> record : records) {
                int partition = record.partition();
                String value = record.value();
                String key = record.key();
                long offset = record.offset();
                System.out.println("数据的key为" + key + ",数据的value为" + value + ",数据的offset为" + offset + ",数据的分区为" + partition);
            }
            //提交partition的offset值
            //Map<TopicPartition, OffsetAndMetadata> offsets

            //获取分区里面最后一条数据的offset值
            long offset = records.get(records.size() - 1).offset();
            Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset));

            //处理完成一个分区里面的数据后提交offset
            kafkaConsumer.commitSync(topicPartitionOffsetAndMetadataMap);
        }

    }
}

消费指定分区数据
package cn.itcast.kafka.demo2;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

/**
 * 消费指定分区
 */
public class ConsumerMyPartition {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //指定消费者组的名字
        props.put("group.id", "testGroup4");
        //关闭自动提交offset值,改为手动提交
        props.put("enable.auto.commit", "false");
        //数据key和value的序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //定义kafkaComsumer
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        //Collection<TopicPartition> partitions
        //创建一个集合 泛型为TopicPartition
        TopicPartition topicPartition = new TopicPartition("test", 0);
        TopicPartition topicPartition1 = new TopicPartition("test", 1);

        List<TopicPartition> topicPartitions = Arrays.asList(topicPartition, topicPartition1);

        //通过assign方法注册消费topic:test中的某些分区
        kafkaConsumer.assign(topicPartitions);

        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(3000);
            //获取所有分区
            Set<TopicPartition> partitions = consumerRecords.partitions();
            for (TopicPartition topicPartition2 : partitions) {
                //获取一个分区中的所有数据
                List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition2);
                for (ConsumerRecord<String, String> record : records) {
                    int partition = record.partition();
                    String value = record.value();
                    String key = record.key();
                    long offset = record.offset();
                    System.out.println("数据的key为" + key + ",数据的value为" + value + ",数据的offset为" + offset + ",数据的分区为" + partition);
                }
                long offset = records.get(records.size() - 1).offset();
                kafkaConsumer.commitSync(Collections.singletonMap(topicPartition2, new OffsetAndMetadata(offset)));

            }
        }
    }
}

重复消费和数据丢失

在这里插入图片描述
以上图为例,Consumer需要将数据写入到Hbase后,再提交offset值。那么就可以有四种上传情况的发生:
一、写入Hbase成功,提交offset成功 —— 这就是正常的消费情况
二、写入Hbase失败,提交offset失败 —— 不会有什么影响,继续进行消费即可
三、写入Hbase成功,但是offset提交失败 —— 这就会造成重复消费
四、写入Hbase失败,但是offset提交成功 —— 这样就会造成数据丢失

Kafka一共有三种消费模型:
exactly once —— 没有出错
at least once —— 重复消费
at most once —— 数据丢失
出现后两种模型的原因一般是offset没有管理好
实际工作中大多数公司的解决办法是将offset的值保存到redis或者hbase当中

数据消费存在高阶API (High Level API)低阶API (High Level API)
高阶API是将offset值默认保存在zk中,早期的Kafka一般默认使用高阶API。
低阶API就是将offset值保存在kafka自带的一个topic种
在这里插入图片描述

原文地址:https://www.cnblogs.com/zzzsw0412/p/12772444.html