kafka-producer使用总结

bootstrap.servers

producer端配置的kafka集群的地址,是一个ip:port的list,逗号隔开,可以只填一个,kafka会自动发现集群里

其他的broker,但是如果配置的这个broker正好挂了,那就不行了

key.serializer/value.serializer

这个比较固定,一般都是实现 org.apache.kafka.common.serialization.Serializer 接口的 Serializer 类

acks  重磅属性

  • acks=0-------》不能保证发送的消息被server接收,重试机制不生效

生产者将不等待任何消息确认,消息将立刻添加到socket缓冲区并考虑发送

  • acks=1--------》只管leader,不管follower,这个是默认值,因此默认producer可以保证at least once

leader写入消息到本地日志就立即响应,不等待所有follower应答。

如果消息响应之后、follower还未复制之前,leader立即故障,消息将会丢失

  • acks=all 或 acks=-1

leader将等待所有副本同步后应答消息表示写入成功。配合min.insync.replicas设置可保证消息不丢失

retries/retry.backoff.on

默认值为0,不重试。设置大于0的值的话,客户端如果发送失败则会重新发送

这个重试功能和客户端在接到错误响应之后重新发送功能和效果是一样的

注意:如果要保证消息发送的顺序,还要配合设置 max.in.flight.requests.per.connection=1

因为如果2个批次发送到一个分区,第1个失败了并重试,但是第2个成功了,那么第2个批次将早于第一个发送完成

对于多次重试失败的,两次重试之间的间隔由retry.backoff.on来确定,默认100ms

max.in.flight.requests.per.connection

阻塞之前,客户端单个连接上的未应答的请求最大多少个时产生阻塞。默认值为5

如上,如果设置大于1且发送失败,会由于重试(如果开启了重试)会导致消息重新排序的风险

batch.size   单位是byte

生产者一般是将多条消息打包发送到分区,来减少请求交互。这样有助于客户端和server的性能提升,

打包的单个批次的消息大小不能超过这个设置值,设置太小会降低吞吐量,太大又可能耗费内存,也是需要根据业务琢磨的

connections.max.idle.ms

闲置的连接多少秒之后关闭

linger.ms

这个设置是为了满足这样的场景:消息不是很快(负载比较低)或单条消息不是很大,需要积攒若干次才能接近batch.size这么大

设置这个可以减少批次请求的次数,攒够linger.ms时间后再一次性发送。

同样的有利有弊,看场景,如果比较闲就攒攒,如果来消息比送消息快,那就不要设置了,默认是0

request.timeout.ms(30s)/max.block.ms(60s)

官网没解释清楚两者的区别,查了下cwiki,描述的很清楚,这里贴一下

1.max.block.ms - the maximum time producer.send() and partitionsFor() will block.  

  For send() it includes:     

    metadata fetch time     

    buffer full block time     

    serialization time (customized serializer)     

    partitioning time (customized partitioner)

  For partitionsFor() it includes:     

    metadata fetch time

2.request.timeout.ms - the maximum time to wait for the response of a message AFTER the batch is ready,

  including: actual network RTT and server replication time

3.Some thing to clarify:   

  request.timeout.ms only starts counting down after a batch is ready.   

  request.timeout.ms does not include retries - each try has a full request.timeout.ms. The reason ...   ......

request.timeout.ms意思是请求超时的这个时间不包括retry的时间,每次retry都会重新有一个完整的超时时间

max.block.ms是当buffer满了 或 metadata获取不到(比如leader挂了) 或 序列化没完成 或 分区函数没计算完成等情况下的最大阻塞时间

reconnect.backoff.ms/reconnect.backoff.max.ms

如果producer与server断开,让我设计的话肯定会设计重连的逻辑,要考虑2个问题(当然是设计者,学习者得站在设计的角度思考)

第1个问题,前一次重连失败后间隔多久再重连,这就是reconnect.backoff.ms;

第2个问题,重连的时候多久算连接失败,总不能尝试连接的时候一直等吧,这就是reconnect.backoff.max.ms,指的是重连的总的最大

时间。另外每次连接失败,重连时间都会指数级增加,每次增加的时间会存在20%的随机抖动,以避免连接风暴。

来了来了,有点springboot开发中接口调用重试设计的味道了^-^

enable.idempotence(默认false)

尝鲜,producer端的幂等性保证

当设置为true,生产者将确保每个消息正好一次复制写入到stream。
当设置为false,如果broker故障,生产者重试。即,可以在流中写入重试的消息。
启用幂等时需要将max.in.flight.requests.per.connection设置为1,重试次数不能为零,acks必须设置为all
如果这些值设置为与幂等生成器不兼容的值,将抛出一个ConfigException异常

producer发送消息的主要过程

  1. 构造ProducerRecord 对象,对象中声明topic、value、partition、key(后两个非必须)
  2. 调用send() 方法,发送消息
  3. 消息内容序列化,因为消息要在网络上传输,必须进行序列化(配置的序列化器key、value对象序列化成字节数组)
  4. 计算发送到哪个分区,如果ProducerRecord指定了partition,分区器直接返回指定的分区;否则分区器根据 key 计算出一个分区
  5. 将记录添加到一个batch里,这个batch里所有的消息会被发送到相同的topic和partion。一个独立的线程会将这些batch发送到相应的Broker
  6. Broker成功接收到消息,响应给producer,返回消息的元数据(包括主题和分区信息以及记录在分区里的偏移量)发送失败,可以选择重试或者直接抛出异常
原文地址:https://www.cnblogs.com/yb38156/p/14585245.html