Kafka利用Java API自定义生产者,消费者,拦截器,分区器等组件

代码不会写怎么办?看官方写好的类,照着写一遍就行!

一、生产者

实现类

无需实现任何类,需要开启main方法

基本思路

new Properties() 来配置一些参数——>new KafkaProducer()  创建一个生产者对象,并将配置信息传入 ——> 调用生产者对象的send()方法来发送数据

代码示例

public static void main(String[] args) {

        //producer的配置信息
        Properties props = new Properties();
        // 服务器的地址和端口,也可用ProducerConfig中的属性来代替
           //ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
        props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        // 接受服务端ack确认消息的参数,0,-1,1
        props.put("acks", "all");
        // 如果接受ack超时,重试的次数
        props.put("retries", 3);
        // sender一次从缓冲区中拿一批的数据量
        props.put("batch.size", 16384);
        // 如果缓冲区中的数据不满足batch.size,只要和上次发送间隔了linger.ms也会执行一次发送
        props.put("linger.ms", 1);
        // 缓存区的大小
        props.put("buffer.memory", 33554432);
        //配置生产者使用的key-value的序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // <key,value>,泛型,必须要和序列化器所匹配
        Producer<Integer, String> producer = new KafkaProducer<>(props);
        
        
        for (int i = 0; i < 10; i++){
        //发送数据,此处演示带回调函数的异步发送
        //异步带回调的发送
            producer.send(new ProducerRecord<Integer, String>("test2", i, "atguigu" + i), new Callback() {
              //  一旦发送的消息被server通知了ack,此时会执行onCompletion()
                // RecordMetadata: 当前record生产到broker上对应的元数据信息
                // Exception: 如果发送失败,会将异常封装到exception返回
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    //没有异常
                    if (exception==null){
                        //查看数据的元数据信息
                        System.out.println("partition:"+metadata.topic()+"-"+metadata.partition()+",offset:"+
                                metadata.offset());
                    }

                }
            });
            
            /**
            如果要实现同步发送,只需要调用get()方法,使main线程阻塞即可
            
            RecordMetadata result=producer.send(new ProducerRecord()).get();
            
            */

        producer.close();


    }

二、拦截器

实现类

MyInterproter2 implements ProducerInterceptor

基本思路


在onsend()方法里对record进行业务逻辑的操作,之后封装成一个全新的ProducerRecord

代码示例

public class TimeStampInterceptor implements ProducerInterceptor<Integer,String> {

    //拦截数据
    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {

        String newValue=System.currentTimeMillis()+"|"+record.value();

        return new ProducerRecord<Integer, String>(record.topic(),record.key(),newValue);
    }

    //当拦截器收到此条消息的ack时,会自动调用onAcknowledgement()
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    // Producer关闭时,调用拦截器的close()
    @Override
    public void close() {

    }

    //读取Producer中的配置
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

在Producer中设置

        //拦截器链
        ArrayList<String> interCeptors = new ArrayList<>();

        // 添加的是全类名,注意顺序,先添加的会先执行
        interCeptors.add("com.atguigu.kafka.custom.TimeStampInterceptor");
        interCeptors.add("com.atguigu.kafka.custom.CounterInterceptor");
         //设置拦截器
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interCeptors);

三、分区器

实现类

MyPartitioner implements Partitioner

代码实现

public class MyPartitioner implements Partitioner {

    //为每个ProduceRecord计算分区号
    // 根据key的hashCode() % 分区数
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获取主题的分区数
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

        int numPartitions = partitions.size();

        return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
    }

    // Producer执行close()方法时调用
    @Override
    public void close() {

    }

    // 从Producer的配置文件中读取参数,在partition之前调用
    @Override
    public void configure(Map<String, ?> configs) {

        System.out.println(configs.get("welcomeinfo"));

    }
}

在producer中设置

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.custom.MyPartitioner");

 四、消费者

实现类

无需实现任何类,只需要开启mian线程

基本思路

跟生产者一样,都需要新建一个Properties,往里面设置各种参数,然后新建KafkaConsumer对象,将参数传进去,调用 consumer.subscribe()订阅主题,之后再调用 consumer.poll()拉取数据进行消费

注意点:

可以配置是否自动提交offset(提交的offset为当前消费的消息offset +1),自动提交参数配置,自动提交可能会造成数据漏消费

 // 允许在消费完数据后,自动提交offset
        props.put("enable.auto.commit", "true");
        // 每次自动提交offset的间隔时间
        props.put("auto.commit.interval.ms", "1000");

如果要手动提交,需要将enable.auto.commit设置为false,然后在消费逻辑结束后调用提交方法,手动提交可能会造成数据重复消费

//手动同步提交  等offset提交完成后,再继续运行代码
            consumer.commitSync();
            //手动异步提交
            consumer.commitAsync();

如果是独立消费者,需要调用consumer.assign()来确定主题和分区,调用consumer.seek()方法来定位到具体的offset进行消费

//要订阅的分区
        List<TopicPartition> partitions=new ArrayList<>();

        TopicPartition tp1 = new TopicPartition("test1", 0);
        TopicPartition tp2 = new TopicPartition("test1", 1);

        partitions.add(tp1);
        partitions.add(tp2);

        //分配主题和分区
        consumer.assign(partitions);

        //指定offset
        consumer.seek(tp1,20);
        consumer.seek(tp2,30);

代码示例1(自动提交)

public static void main(String[] args) {

        // consumer的配置
        Properties props = new Properties();
        // 连接的集群地址
        props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        // 消费者组id
        props.put("group.id", "test");
        // 消费者id
        props.put("client.id", "test01");
        // 允许在消费完数据后,自动提交offset
        props.put("enable.auto.commit", "true");
        // 每次自动提交offset的间隔时间
        props.put("auto.commit.interval.ms", "1000");
        // key-value的反序列化器,必须根据分区存储的数据类型,选择合适的反序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //基于配置创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅主题,需要传入一个主题集合
        consumer.subscribe(Arrays.asList("test1"));
        //消费数据,采取poll的方式主动去集群拉取数据
        while (true) {
            //每次poll,拉取一批数据,如果当前没有可用的数据,就休息timeout单位时间
            ConsumerRecords<String, String> records = consumer.poll(100);
            //遍历数据,执行真正的消费逻辑
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }

    }
View Code

代码示例2(手动提交时开启事务来避免数据重复消费)

//全局变量
 int offset=当前处理的记录的offset
while (true) {
    
    try{
            //开启事务
            xxxxx
            //每次poll,拉取一批数据,如果当前没有可用的数据,就休息timeout单位时间
            ConsumerRecords<String, String> records = consumer.poll(100);
            //遍历数据,执行真正的消费逻辑
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                //记录当前处理的位置
                offset=record.offset();
            }

            //手动同步提交  等offset提交完成后,再继续运行代码
            //consumer.commitSync();
            //手动异步提交
            consumer.commitAsync();
            
        // 提交事务
        xxxx
       }catch(Exception e){
             //回滚事务
        xxxxxx
//在catch中,提交之前已经处理的offset,自己维护提交的offset,例如将Offset存储到mysql中! 将offset存储,提交到mysql中 } } }

代码示例3(自定义offset存储位置)

public class CustomConsumer {

    private static Map<TopicPartition, Long> currentOffset = new HashMap<>();

public static void main(String[] args) {

//创建配置信息
        Properties props = new Properties();

//Kafka集群
        props.put("bootstrap.servers", "hadoop102:9092"); 

//消费者组,只要group.id相同,就属于同一个消费者组
        props.put("group.id", "test"); 

//关闭自动提交offset
        props.put("enable.auto.commit", "false");

        //Key和Value的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //创建一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //消费者订阅主题
        //ConsumerRebalanceListener 可在rebalance时进行调用
        consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
            
            //该方法会在Rebalance之前调用
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                commitOffset(currentOffset);
            }

            //该方法会在Rebalance之后调用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                currentOffset.clear();
                for (TopicPartition partition : partitions) {
                    consumer.seek(partition, getOffset(partition));//定位到最近提交的offset位置继续消费
                }
            }
        });

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
            }
            commitOffset(currentOffset);//异步提交
        }
    }

    //获取某分区的最新offset
    private static long getOffset(TopicPartition partition) {
        return 0;
    }

    //提交该消费者所有分区的offset
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {

    }
}
原文地址:https://www.cnblogs.com/yangxusun9/p/12506344.html