Kafka 自定义拦截器ProducerInterceptor

案例:

自定义两个Producer 连接器,一个在消息上添加时间戳,一个统计消息成功失败个数(当然这两个拦截器可以写在一块,我们现在分两个拦截器来实现)。

1:Pom文件 添加如下代码.(slf4j 依赖 是为了去编译警告的。可以不加) 

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

2:编写TimeInterceptor  拦截器

package com.Interceptor;



import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;


public class TimeInterceptor  implements ProducerInterceptor<String,String> {

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {

        //读取 数据
        String value = producerRecord.value();
        value = System.currentTimeMillis() + value;
        //穿件一个新的ProducerRecord对象
        return  new ProducerRecord<String, String>(producerRecord.topic(),producerRecord.partition(),producerRecord.key(),value);
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

3:编写成功统计拦截器

package com.Interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CounterInterceptor implements ProducerInterceptor<String,String>  {
    private  int success;
    private  int errors;
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {

        return producerRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if(recordMetadata !=null)
        {
            success++;
        }
        else
        {
            errors++;
        }

    }

    @Override
    public void close() {
        System.out.println("Success : " + success);
        System.out.println("Errors : " + errors);
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

4:添加拦截器到生产者中.

package com.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.ArrayList;
import java.util.Properties;

public class InterceptorProducer {
    public static void main(String[] args) {
        //Create kafka 生产者配置信息
        Properties properties = new Properties();
        //kafka 集群, broker list
        properties.put("bootstrap.servers", "hadoop101:9092");
        properties.put("acks", "all");
        //重试次数
        properties.put("retries", 1);
        //批次大小
        properties.put("batch.size", 16384);
        //等待时间
        properties.put("linger.ms", 1);
        //RecordAccumulator 缓冲区大小 32M
        properties.put("buffer.memory", 33554432);
        // key value 的序列化类
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //添加多个拦截器
        ArrayList<String> interceptors=  new ArrayList<String> ();
        interceptors.add("com.Interceptor.TimeInterceptor");
        interceptors.add("com.Interceptor.CounterInterceptor");
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
        //创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        //发送数据
        for(int i = 11 ;i <= 20;i++)
        {
            producer.send(new ProducerRecord<String, String>("bbb","Kafkakafka--"+i));
        }

        //关闭连接
        producer.close();

    }
}

5:启动消费者

bin/kafka-console-consumer.sh  --zookeeper hadoop101:2181 --topic bbb

6:启动生产者,查看运行结果.

 

原文地址:https://www.cnblogs.com/kpwong/p/13780541.html