Kafka Product

流程:

1.product首先使用一个线程(用户主线程,也就是用户启动producer的线程)将待发送的消息封装进一个producerRecord类实例,然后将其系列化之后发给partitioner,
再由后者确定了目标分区后一同发送到位于product程序中的一块内存缓冲区中。而product的另一个工作线程(I/O发送线程,也成sender线程)则负责实时的从缓冲区中
提取处准备就绪的消息封装进一个批次,同一发送给对应的broker。

如图:

发送消息方式(3种)

1.同步发送
2.异步发送+回调
3.fire and forget (发送之后不再理会)
product send方法提供回调参数实现异步发送以及对发送结果的响应。也可以自定义回调函数 需要实现org.apache.kafka.clients.producer.Callback接口

错误(2类)

1.可重试异常(继承org.apache.kafka.common.errors.RetriableException)
- LeaderNotAvailableException:通常处于leader换届选举期间,瞬时异常,重试后恢复。
- NotControllerException:通常表明controller在经历新一轮选举,重试机制自行恢复。
- NetworkException:网络异常。
2.不可重试异常
- RecoedTooLargerException:发送的消息尺寸过大,超过规定上限。
- SerializationException:序列化失败异常。
- KafkaException:其他类型的异常。

发送参数acks

- 0
  不理睬leader broker端的处理结果,立即发送下一条消息,producer.send的回调无效。但是吞吐量是最高的。
- all
  leader broker 不仅会将消息写进本地日志,同事还会等待ISR(副本集合)中索引偶其他副本成功写入,保证消息不丢失,但吞吐量是最低的。
- 1
  折中的方案,默认的参数。producer发送消息后leader broker仅将消息写进日志,然后发送响应结果给producer,无需等待ISP中其他副本写入该消息。
  即可以保证适当的消息持久性。也保证producer端的吞吐量。
原文地址:https://www.cnblogs.com/snail-gao/p/12924435.html