KAFKA consumer常用api

Kafka中如何创建消费者Consumer已经在前面给大家详细的讲解过,那么如何使用JAVA来消费topic中的数据呢呢,今天就说说。
还是先创建一个topic,拥有一个副本和一个分区

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
示例:

一 自动提交offset

    public class MyConsumer{
        public static void main(String[] args) {
            Properties prop = new Properties();
            prop.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
//或者
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.5.150:9092");

            prop.put("group.id", "testGroup1"); //必须要组名
//或者
            prop.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup1");

            prop.put("enable.auto.commit", "true");//默认值true 指定为自动提交offset
//或者
            prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");


            prop.put("auto.commit.interval.ms", "1000");//默认值5000
//或者
            prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

            prop.put(auto.offset.reset, "earliest"); //相当于从开始读 --from-beginning
//或者
            prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//反序列化
            prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//或者
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

            prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//或者
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

//创建consumer对象
            KafkaConsumer<String, String> consumer = new KafkaConsumer(prop);
//消费者订阅的topic, 可同时订阅多个,指定topic名字
            consumer.subscribe(Arrays.asList("mytest"));//subscribe不能指定partition


//从topic消费数据
            consumer.assign(Arrays.asList(new TopicPartition("mytest",0))); //分区号是0分区,就是第一个分区 assign可以指定partiton

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000); //按时间段读取数据,读取超时时间为1000ms
                for (ConsumerRecord<String, String> record : records){
                    System.out.printf(
                            record.partition(),record.offset(), record.key(), record.value());
                }
            }
        }
    }

解析:

bootstrap.servers 只是代表kafka的连接入口,只需要指定集群中的某一broker,全部指定也没关系;
一旦consumer和kakfa集群建立连接,consumer会以心跳的方式来高速集群自己还活着,如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence。

二 手动提交offset

如果consumer在获得数据后需要加入处理,数据完毕后才确认offset,需要程序来控制offset的确认。举个例子:
consumer获得数据后,需要将数据持久化到DB中。自动确认offset的情况下,如果数据从kafka集群读出,就确认,但是持久化过程失败,就会导致数据丢失。我们就需要控制offset的确认。

Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
props.put("group.id", "testGroup1"); //必须要组名

//指定为手动提交offset
props.put("enable.auto.commit", "false");//默认值true
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");

props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    //创建consumer对象
    KafkaConsumer<String, String> consumer = new KafkaConsumer(prop);
//消费者订阅的topic, 可同时订阅多个,指定topic名字
consumer.subscribe(Arrays.asList("mytest"));

    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
        }
//数据达到批量要求,就写入DB,同步确认offset
        if (buffer.size() >= minBatchSize) {
//提交offset,只要是未提交,表示消息没有被消费,下次重启的时候会继续消费
            insertIntoDb(buffer);
            consumer.commitAsync();
            buffer.clear();
        }
    }

三 指定从topic的特定分区的特定某个offset开始消费

//创建consumer对象
    KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("mytest")); //指定topic名字

//从topic消费数据
consumer.assign(Arrays.asList(new TopicPartition("mytest",0))); //分区号是0分区,就是第一个分区
//消费2号分区的数据
consumer.seek(testpartition2,offset:50)//指定从topic的2号分区的某个offset开始消费
//从头开始消费
            consumer.seekToBeginning(Arrays.asList(testpartition2)

            while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf(
                    "partition=%d,offset=%d,key=%s,value=%s%n",
                    record.partition(),record.offset(),record.key(), record.value())
    }
}

说明:确认的offset为已接受数据最大offset+1。

分区订阅
可以向特定的分区订阅消息。但是会失去partion的负载分担。有几种场景可能会这么用:

只需要获取本机磁盘的分区数据;
程序自己或者外部程序能够自己实现负载和错误处理。例如YARN/Mesos的介入,当consumer挂掉后,再启动一个consumer。
String topic = "mytest"
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
1
2
3
说明:

此种情况用了consumer Group,也不会做负载均衡。
topic的订阅和分区订阅不可以在同一consumer中混用。
外部存储offset
消费者可以自定义kafka的offset存储位置。该设计的主要目的是让消费者将数据和offset进行原子性的存储。这样可以避免上面提到的重复消费问题。举例说明:
订阅特定分区。存储所获得的记录时,将每条记录的offset一起存储。保证数据和offset的存储是原子性的。当异步存储被异常打断时,凡已经存储的数据,都有相应的offset记录。这种方式可以保证不会有数据丢失,也不会重复的从服务端读取。
参数配置:

去使offset自动确认:enable.auto.commit=false;
从ConsumerRecord中获取offset,保存下来;
Consumer重启时,调用seek(TopicPartition, long)重置在服务端的消费记录。
如果消费分区也是自定义的,这种方式用起来会很爽。如果分区是自动分配的,当分区发生reblance的时候,就要考虑清楚了。如果因为升级等原因,分区漂移到一个不会更新offset的consumer上,那就不好处理了。
该情况下:

原Consumer需要监听分区撤销事件,并在撤销时确认好offset。接口:ConsumerRebalanceListener.onPartitionsRevoked(Collection);
新Consumer监听分区分配事件,获取当前分区消费的offset。接口:ConsumerRebalanceListener.onPartitionsAssigned(Collection)
Consumer监听到 ConsumerRebalance事件,还没有处理或者持久化的缓存数据flush掉。
控制消费位置
大多数情况下,服务端的Consumer的消费位置都是由客户端间歇性的确认。Kafka允许Consumer自己设置消费起点,达到的效果:
1.可以消费已经消费过的数据;
2. 可以跳跃性的消费数据;
看下这样做的一些场景:

对Consumer来说,数据具备时效性,只需要获取最近一段时间内的数据,就可以进行跳跃性的获取数据;
上面自己存offset的场景,重启后就需要从指定的位置开始消费。
控制消费流Consumption Flow Control
如果一个consumer同时消费多个分区,默认情况下,这多个分区的优先级是一样的,同时消费。Kafka提供机制,可以让暂停某些分区的消费,先获取其他分区的内容。场景举例:

流式计算,consumer同时消费两个Topic,然后对两个Topic的数据做Join操作。但是这两个Topic里面的数据产生速率差距较大。Consumer就需要控制下获取逻辑,先获取慢的Topic,慢的读到数据后再去读快的。
同样多个Topic同时消费,但是Consumer启动是,本地已经存有了大量某些Topic数据。此时就可以优先去消费下其他的Topic。
调控的手段:让某个分区消费先暂停,时机到了再恢复,然后接着poll。接口:pause(TopicPartition…),resume(TopicPartition…)

总结

1 subscribe 只能指定topic名字,入参是list

2 assign可以指定topic和partition

3 seek指定topic和partition,还有offset

原文地址:https://www.cnblogs.com/juniorMa/p/15693285.html