kafka-producer api使用总结

和性能相关的问题,可以通过上一篇总结的参数来琢磨配置,到这里,业务可以正常运作,正常发送不是问题

但是业务如果要严谨,需要关注异常情况怎么处理,这里着重总结发送失败的处理方式,另外需要看是用原生的

kafka-client还是springboot集成的springboot-kafka方式实现

kafka-clients方式

下面是KafkaProducer.class及其关键代码(KafkaProducer是线程安全的)

Producer producer = new KafkaProducer(kafkaProperties);
ProducerRecord<String,String> record = new ProducerRecord<>("topic1","key1","value111");

//这是最终的send方法
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback)

第一种,不管不顾,疯狂发送,就是send,这样写就等着被打回

producer.send(record);

第二种,同步方式,调用的是Future.get()来阻塞调用send()的线程,在catch中处理异常,这里存在两种情况

1、一切OK,就是发送失败了,比如连接突然断开,比如leader挂了,刚好选举,这时候retries属性就用上了

2、消息本身不对,比如超过设置的大小,会直接抛出异常try

  producer.send(record).get(); 
} catch (Exception e) { 
  //处理你的异常,这里无法拿到发送的record消息,需要设计
}

第三种,异步方式,异常通过回调函数处理,而不是发送时出现异常立即就处理,骨架代码如下:

producer.send(record, new Callback() {
  @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        //do something
    }
}); 

查看Callback接口,可以看到不管是RecordMetadata还是Exception,都是不包含发送的record的

public interface Callback {
    void onCompletion(RecordMetadata var1, Exception var2);
}
public final class RecordMetadata {
    public static final int UNKNOWN_PARTITION = -1;
    private final long offset;
    private final long timestamp;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final TopicPartition topicPartition;
    private volatile Long checksum;
    ......
}

所以需要实现我们自己的org.apache.kafka.clients.producer.Callback接口,想办法传进去record,常规的设计思路,当然是增加一个属性,有参构造函数传进去

class MyCallback implements Callback {
   private Object msg;
   public MyCallback(Object msg) {
       this.msg = msg;
   }
   
   @Override
   public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception!=null){ do something with msg} } } producer.send(record,
new MyCallback(record));

springboot-kafka方式

先说下几个点,springboot-kafka包是包含kafka-client包的,用法差异比价大

首先初始化一个KafkaTemplate

@Component
public class KafkaConfig {

    @Value("${kafka.brokers}")
    private String brokers;

    @Autowired
    public KafkaProducerListener producerListener;

    /**
     * producer,方法名就是注入时候的属性变量名
     * 使用时请注入属性:KafkaTemplate kafkaTemplate
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> configs = new HashMap<>(16);
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        //重试次数
        configs.put(ProducerConfig.RETRIES_CONFIG, 3);
        //批次发送的大小,单位是byte
        configs.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //请求超时时间默认给30s,每次retry都是一个完整的30s
        configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        //这样可以保证最大程度的消息发送不丢失
        configs.put(ProducerConfig.ACKS_CONFIG, "all");
        //延迟发送的时间,目的是尽量使batch满了之后才发送,默认0
        configs.put(ProducerConfig.LINGER_MS_CONFIG, 50);
        //生产者用来缓存等待发送到服务器的消息的内存总字节数,这里是默认值
        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaTemplate template = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configs));
        template.setProducerListener(producerListener);
        return template;
    }

}

然后是send方法,一般带回调的是这样

public void sendAndCallback(String msg) {
       ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicName, msg);
       future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
           @Override
           public void onSuccess(SendResult<String, Object> result) {               
           }
           
           @Override
           public void onFailure(Throwable ex) {              
           }
       });
   }

可以看到onSuccess时,可以拿到SendResult,但是onFailure时,只能拿到Throwable ex,也没法拿到msg,这是SendResult的属性

public class SendResult<K, V> {
    private final ProducerRecord<K, V> producerRecord;
    private final RecordMetadata recordMetadata;
}

跟踪下send方法,在KafkaTemplate这个类里面最后找到如下位置:

 

 可以看到producerListener的onError方法是可以处理发送失败的日志的,所以需要在定义KafkaTemplate时就定义自己的producerListener,然后set进去

定义自己的ProducerListener:

@Component
@Slf4j
public class KafkaProducerListener implements ProducerListener<String, Object> {

    @Override
    public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) {
        log.info("message send success:[]",value);
    }

    @Override
    public void onError(String topic, Integer partition, String key, Object value, Exception exception) {
        //消息发送到失败队列,或存库,或重试        
    }

}

在上面初始化KafkaTemplate的时候,

template.setProducerListener(producerListener);

如此即可处理发送失败的消息,保证发送过程中消息尽量不丢失

最后说一下,KafkaProducer类是线程安全的,并且producer端是不支持batch发送一个List,然后多条msg到topic的

原文地址:https://www.cnblogs.com/yb38156/p/14721209.html