Spring-kafka —— 生产者消费者重要配置

一、生产者配置

属性 描述 类型 默认值 重要性

bootstrap.servers

用于建立与kafka集群的连接,这个list仅仅影响用于初始化的hosts,来发现全部的servers。
格式:host1:port1,host2:port2,…,数量尽量不止一个,以防其中一个down了
list ""
 

acks

 procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
  • acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。

   在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。

  • acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,

         在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。

  • acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,

     这是最强有力的保证,这相当于acks = -1的设置。

  • 可以设置的值为:all, -1, 0, 1
string  1  高

retries

发送失败重试次数。

设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。

允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。

string 1  高

retry.backoff.ms

发送失败,每次重试的间隔毫秒数。 long  100   低

buffer.memory

producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。

这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。

一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。

long

33554432 

(32M)

 高

batch.size

批处理大小。

每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
这有助于提升客户端和服务端之间的性能,此配置控制默认批量大小(以字节为单位)

int   16384

(16K)

中 

linger.ms

生产者将在请求传输之间到达的所有记录组合到一个个Batch中。一个Batch被创建之后,最多过linger.ms,不管这个Batch有没有写满,都必须发送出去了。 long  中 

max.request.size

请求的最大大小(字节)。

这个参数决定了每次发送给Kafka服务器请求的最大大小,同时也会限制你一条消息的最大大小也不能超过这个参数设置的值。

int 

1048576

(1M)

中 

compression.type

producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好 string none

key.serializer

key的Serializer类,实现了org.apache.kafka.common.serialization.Serializer接口
class  

value.serializer

值的Serializer类,实现了org.apache.kafka.common.serialization.Serializer接口
class  

client.id

当向server发出请求时,这个字符串会发送给server。目的是能够追踪请求源头,以此来允许ip/port许可列表之外的一些应用可以发送信息。

这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪

string ""

client.dns.lookup

控制客户端如何使用DNS查找。 string default
connections.max.idle.ms 在此配置指定的毫秒数之后关闭空闲连接。 long 540000

delivery.timeout.ms

调用send()返回后报告成功或失败的时间上限。

此配置的值应大于或等于 request.timeout.ms 加 linger.ms 的总和.

int 120000

max.block.ms

控制block的时长,当buffer空间不够或者metadata丢失时产生block. long 60000

request.timeout.ms        

配置控制客户端等待请求响应的最长时间。如果在超时时间过去之前未收到响应,则客户端将在必要时重新发送请求,或者在重试次数用尽时使请求失败。 int 30000

partitioner.class

实现 org.apache.kafka.clients.producer.Partitioner 接口

默认值:org.apache.kafka.clients.producer.internals.DefaultPartitioner

class  

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

二、消费者配置

属性 描述 类型 默认值 重要性

bootstrap.servers

以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接 list ""

group.id

用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group                                                   string null

fetch.min.bytes

服务器应为获取请求返回的最小数据量。如果可用的数据不足,则请求将在响应请求之前等待该数据的累积。

默认设置为1字节意味着,只要有一个字节的数据可用,或者提取请求在等待数据到达时超时,就会响应提取请求。

int 1

fetch.max.bytes

服务器应为获取请求返回的最大数据量。记录由使用者分批获取,如果获取的第一个非空分区中的第一个记录批大于此值,

则仍将返回该记录批,以确保使用者能够取得进展。

int   52428800 中 

max.partition.fetch.bytes

服务器将返回的每个分区的最大数据量。记录由消费者分批提取。

如果fetch的第一个非空分区中的第一个记录批大于此限制,则仍将返回该批以确保使用者能够取得进展。

int  1048576

heartbeat.interval.ms

使用Kafka的组管理工具时,消费者协调器的心跳之间的预期时间。

心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于session.timeout.ms,

但通常应设置为不高于该值的1/3。它可以调整得更低,以控制正常再平衡的预期时间。

int 3000

session.timeout.ms

使用Kafka的组管理工具时用于检测客户端故障的超时。

如果consumer在这段时间内没有发送心跳信息,则它会被认为挂掉了,并且reblance将会产生,

必须在[group.min.session.timeout.ms, group.max.session.timeout.ms]范围内

int  10000

request.timeout.ms

配置控制客户端等待请求响应的最长时间。

如果在超时时间过去之前未收到响应,则客户端将在必要时重新发送请求,或者在重试次数用尽时使请求失败。 

int  3000

key.deserializer

key的反序列化类。实现了org.apache.kafka.common.serialization.Deserializer接口 class  

value.deserializer

值的反序列化类。实现了org.apache.kafka.common.serialization.Deserializer接口 class  

allow.auto.create.topics

允许在订阅或分配主题时在代理上自动创建主题。

只有当代理允许使用 auto.create.topics.enable 的情况下才生效。

boolean true

exclude.internal.topics

是否应将与订阅模式匹配的内部主题从订阅中排除。始终可以显式订阅内部主题。 boolean true

enable.auto.commit

启动自动提交。

如果为真,则用户的偏移量将在后台定期提交。

boolean true

auto.commit.interval.ms

使用者偏移自动提交到Kafka的频率(以毫秒为单位),enable.auto.commit设置为true。  int 5000 低 

auto.offset.reset

当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时该怎么办(例如,由于该数据已被删除):

  • earliest:将偏移量自动重置为最早的偏移量
  • latest:自动将偏移量重置为最新偏移量
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
  • anything else:向消费者抛出异常
string latest

max.poll.interval.ms

使用使用者组管理时调用poll()之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了上限。

如果在此超时过期之前未调用poll(),则认为使用者失败,该组将重新平衡,以便将分区重新分配给另一个成员。

对于使用非空group.instance.id组如果达到此超时,则不会立即重新分配分区。

int 300000

max.poll.records

在对poll()的单个调用中返回的最大记录数。

max.poll.records条数据需要在session.timeout.ms这个时间内处理完

int 500

client.dns.lookup

控制客户端如何使用DNS查找。 string default

connections.max.idle.ms

在此配置指定的毫秒数之后关闭空闲连接。 long 540000

default.api.timeout.ms

指定客户端API的超时(毫秒)。  int 60000

group.instance.id

最终用户提供的使用者实例的唯一标识符。  string  null

partition.assignment.strategy

在“range”和“roundrobin”策略之间选择一种作为分配partitions给consumer 数据流的策略; 循环的partition分配器分配所有可用的partitions以及所有可用consumer 线程。

它会将partition循环的分配到consumer线程上。如果所有consumer实例的订阅都是确定的,则partitions的划分是确定的分布。循环分配策略只有在以下条件满足时才可以:

(1)每个topic在每个consumer实例上都有同样数量的数据流。(2)订阅的topic的集合对于consumer group中每个consumer实例来说都是确定的

list class

send.buffer.bytes

发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。如果值为-1,将使用操作系统默认值。 int 131072 中 

receive.buffer.bytes

读取数据时要使用的TCP接收缓冲区(SO_RCVBUF)的大小。如果值为-1,将使用操作系统默认值。  int  65536

client.id

请求时传递给服务器的id字符串。这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名来跟踪请求源,而不仅仅是ip/端口。 string ""

fetch.max.wait.ms

Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。

这个配置就是来配置consumer最多等待response多久。

int 500

引用官网:http://kafka.apache.org/documentation/#consumerconfigs

https://www.cnblogs.com/yx88/p/11013338.html

 

 

 

 

原文地址:https://www.cnblogs.com/caoweixiong/p/11187404.html