Kafka常用操作

#kafka集群搭建教程
https://blog.csdn.net/zxy987872674/article/details/72466504/

#创建topic以及分区和副本数量
./kafka-topics.sh --create --zookeeper 192.168.10.33:2181,192.168.10.34:2181 --topic test-topic --partitions 2 --replication-factor 2


#发送消息
sh kafka-console-producer.sh --broker-list 192.168.10.33:9092,192.168.10.34:9092 --topic test-topic-partition-1

#消费消息
sh kafka-console-consumer.sh --bootstrap-server 192.168.10.33:9092 --topic test-topic --from-beginning
--from-beginning表示从头开始消费

指定消费组GroupID
bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.33:9092,192.168.10.34:9092 --topic test-topic-89 --from-beginning --group console-group-dsz


#消费者列表
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.10.33:9092,192.168.10.34:9092 --list

#查看消费组属性
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.10.33:9092,192.168.10.34:9092 --describe --group COUNT


#查看topic分布情况
./bin/kafka-topics.sh --describe --zookeeper 192.168.10.33:2181,192.168.10.34:2181 --topic __consumer_offsets


#启动zookeeper
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >zk.log &

#停止zookeeper
./zookeeper-server-stop.sh

#启动kafka
nohup ./bin/kafka-server-start.sh config/server.properties >kafka.log &


#zookeeper配置
dataDir=/tmp/kafka/zookeeper
dataLogDir=/tmp/kafka/log/zookeeper
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
server.1=192.168.10.33:2888:3888
server.2=192.168.10.34:2888:3888
--
mkdir -p /tmp/kafka/zookeeper && mkdir -p /tmp/kafka/log/zookeeper

#查看topic列表
sh kafka-topics.sh --list --zookeeper 192.168.10.33:2181,192.168.10.34:2181
sh /usr/local/kafka/kafka_2.11-2.0.0/bin/kafka-topics.sh --list --zookeeper 192.168.10.33:2181,192.168.10.34:2181
--
__consumer_offsets
test-topic
test-topic-1
test-topic-3


#删除topic
./bin/kafka-topics.sh --delete --zookeeper 【zookeeper server】 --topic 【topic name】
sh /usr/local/kafka/kafka_2.11-2.0.0/bin/kafka-topics.sh --delete --zookeeper 192.168.10.33:2181,192.168.10.34:2181 --topic test-topic
彻底删除topic:
1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录
2、如果配置了delete.topic.enable=true直接通过命令删除,如果命令删除不掉,直接通过zookeeper-client 删除掉broker下的topic即可。


#列出topic所有消息列表
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.10.33:9092,192.168.10.34:9092 --topic test-topic-89 --time -1
test-topic-89:0:12
test-topic-89:1:6


#查看topic属性
sh kafka-topics.sh --describe --zookeeper 192.168.10.33:2181,192.168.10.34:2181 --topic test-topic-5
--
Topic:test-topic-5 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test-topic-5 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test-topic-5 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic:主题名称。
Leader:当前分区的leader broker id在配置kafka集群的时候有设置。
Replicas:所有副本在broker分布情况。1,2表示分辨分布在1和2的broker上面。
Isr: 所有副本中跟分区leader数据同步最接近的几个副本。便于leader的重新选举,优先考虑Isr列表中的副本。当然也可以设置不从这里面选举leader。


#__consumer_offsets 参考:https://www.cnblogs.com/huxi2b/p/6061110.html
-->在获取__consumer_offsets(执行以下命令之前)需要设置kafka的config中consumer.properties中exclude.internal.topics=false,无需重启kafka会自动生效。
bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 11 --bootstrap-server 192.168.10.33:9092,192.168.10.34:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning


#修改分区数量(一般不建议修改,在创建topic的时候就指定好)
发送消息
sh kafka-console-producer.sh --broker-list 192.168.10.33:9092,192.168.10.34:9092 --topic test-topic-fix
查看:
sh kafka-topics.sh --describe --zookeeper 192.168.10.33:2181,192.168.10.34:2181 --topic test-topic-fix
Topic:test-topic-fix PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test-topic-fix Partition: 0 Leader: 2 Replicas: 2 Isr: 2
修改:(分区只能调整增加,不能修改减少分区相对原来的分区数)
sh kafka-topics.sh --zookeeper 192.168.10.33:2181,192.168.10.34:2181 --topic test-topic-fix --partitions 2 --alter


#kafka扩容后的分区重新负载分配(或者调整副本数量,这里可以增加或者减少副本数量),手动操作。
参考博主:https://blog.csdn.net/CREATE_17/article/details/110212789

#kafka可视化工具下载地址,以及使用教程。
https://www.kafkatool.com/download2/offsetexplorer_64bit.exe(下载地址)
https://www.cnblogs.com/miracle-luna/p/11299345.html(tool使用教程)


#关闭kafka自动创建topic,重启生效。这样可以防止代码端任意创建topic脏数据。
auto.create.topics.enable=false

#发送端相关调优参数

batch.size
生产者发送多个消息到broker上的同一个分区时,为了减少网络请求带来的性能开销,通过批量的方式
来提交消息,可以通过这个参数来控制批量提交的字节数大小,默认大小是16384byte,也就是16kb,
意味着当一批消息大小达到指定的batch.size的时候会统一发送

linger.ms
Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞
吐量,而linger.ms就是为每次发送到broker的请求增加一些delay,以此来聚合更多的Message请求。
这个有点想TCP里面的Nagle算法,在TCP协议的传输中,为了减少大量小数据包的发送,采用了Nagle
算法,也就是基于小包的等-停协议。

batch.size和linger.ms这两个参数是kafka性能优化的关键参数,很多同学会发现batch.size和
linger.ms这两者的作用是一样的,如果两个都配置了,那么怎么工作的呢?实际上,当二者都配
置的时候,只要满足其中一个要求,就会发送请求到broker上

消息发送幂等性 参考:https://blog.csdn.net/asdfsadfasdfsa/article/details/104806981
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
properties.put(“enable.idempotence”, true);

max.in.flight.requests.per.connection=1类似于数据库连接池,单次只能操作一条消息而且只能等到成功后再处理下一条消息。
为什么要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于5
其实这里,要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5 的主要原因是:Server 端的 ProducerStateManager 实例会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个batch 数据(这个 5 是写死的,至于为什么是 5,可能跟经验有关,当不设置幂等性时,当这个设置为 5 时,性能相对来说较高,社区是有一个相关测试文档),如果超过 5,ProducerStateManager 就会将最旧的 batch 数据清除。
假设应用将 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 6,假设发送的请求顺序是 1、2、3、4、5、6,这时候 server 端只能缓存 2、3、4、5、6 请求对应的 batch 数据,这时候假设请求 1 发送失败,需要重试,当重试的请求发送过来后,首先先检查是否为重复的 batch,这时候检查的结果是否,之后会开始 check 其 sequence number 值,这时候只会返回一个 OutOfOrderSequenceException 异常,client 在收到这个异常后,会再次进行重试,直到超过最大重试次数或者超时,这样不但会影响 Producer 性能,还可能给 Server 带来压力(相当于client 狂发错误请求)

#消费端

group.id
consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以
有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有
消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由
同一个消费组内的一个consumer来消费.如下图所示,分别有三个消费者,属于两个不同的group,那
么对于firstTopic这个topic来说,这两个组的消费者都能同时消费这个topic中的消息,对于此事的架构
来说,这个firstTopic就类似于ActiveMQ中的topic概念。如右图所示,如果3个消费者都属于同一个
group,那么此事firstTopic就是一个Queue的概念


enable.auto.commit
消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接收到,还可以配合
auto.commit.interval.ms控制自动提交的频率。
当然,我们也可以通过consumer.commitSync()的方式实现手动提交


kakfa消费端pull阻塞等待新消息产生设置?


kafka幂等性以及事务原理分析
参考:https://www.freesion.com/article/8929453461/

fetch.max.wait.ms时间到达后如果还没有足够的消息那么你也给我个回复


#查看某个组的具体消费情况
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.10.33:9092,192.168.10.34:9092 --describe --group dsz-consumer-group-1

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-89 0 12 12 0 dsz-consumer-89-3-674ff44a-715d-4610-bde9-1e95dbfbee7b /192.168.10.2 dsz-consumer-89-3
test-topic-89 1 0 0 0 dsz-consumer-89-3-674ff44a-715d-4610-bde9-1e95dbfbee7b /192.168.10.2 dsz-consumer-89-3


#查看位移日志
kafka-console-consumer.sh --topic __consumer_offsets --partition 8 -- bootstrap-server 192.168.10.33:9092,192.168.10.34:9092 --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

#kafka消费者的三种模式(最多/最少/恰好消费一次)

#rebalance监听器


#Kafka offset管理及存储模型
https://blog.csdn.net/qq_43792385/article/details/108003320


#Kafka的Rebalance机制可能造成的影响及解决方案
https://blog.csdn.net/lzxlfly/article/details/106246879

#新消费组GroupID latest与earliest区别以及使用场景?
1、先用spring.kafka.consumer.auto-offset-reset=latest
2021-06-03 09:29:50.596 INFO 28820 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1
2021-06-03 09:29:50.597 INFO 28820 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457
2021-06-03 09:29:50.598 INFO 28820 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1622683790593
2021-06-03 09:29:50.602 INFO 28820 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Subscribed to topic(s): test-topic-89
2021-06-03 09:29:50.627 INFO 28820 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2021-06-03 09:29:50.712 INFO 28820 --- [ main] c.k.consumer.KafkaConsumerApplication : Started KafkaConsumerApplication in 2.606 seconds (JVM running for 4.452)
2021-06-03 09:29:50.714 INFO 28820 --- [ main] o.s.b.a.ApplicationAvailabilityBean : Application availability state LivenessState changed to CORRECT
2021-06-03 09:29:50.718 INFO 28820 --- [ main] o.s.b.a.ApplicationAvailabilityBean : Application availability state ReadinessState changed to ACCEPTING_TRAFFIC
2021-06-03 09:29:51.099 INFO 28820 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Cluster ID: -FguYa6mTfKqyk8NPyWbsw
2021-06-03 09:29:51.101 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Discovered group coordinator 192.168.10.33:9092 (id: 2147483646 rack: null)
2021-06-03 09:29:51.102 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] (Re-)joining group
2021-06-03 09:29:51.123 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Successfully joined group with generation Generation{generationId=1, memberId='consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1-04175f64-f340-4d81-935c-e96806e6d111', protocol='range'}
2021-06-03 09:29:51.124 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Finished assignment for group at generation 1: {consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1-04175f64-f340-4d81-935c-e96806e6d111=Assignment(partitions=[test-topic-89-0, test-topic-89-1])}
2021-06-03 09:29:51.137 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Successfully synced group in generation Generation{generationId=1, memberId='consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1-04175f64-f340-4d81-935c-e96806e6d111', protocol='range'}
2021-06-03 09:29:51.137 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Notifying assignor about the new Assignment(partitions=[test-topic-89-0, test-topic-89-1])
2021-06-03 09:29:51.141 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Adding newly assigned partitions: test-topic-89-1, test-topic-89-0
2021-06-03 09:29:51.150 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Found no committed offset for partition test-topic-89-1
2021-06-03 09:29:51.150 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Found no committed offset for partition test-topic-89-0
2021-06-03 09:29:51.157 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Found no committed offset for partition test-topic-89-1
2021-06-03 09:29:51.157 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Found no committed offset for partition test-topic-89-0
2021-06-03 09:29:51.178 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Resetting offset for partition test-topic-89-1 to position FetchPosition{offset=6, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.10.34:9092 (id: 2 rack: null)], epoch=absent}}.
2021-06-03 09:29:51.179 INFO 28820 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-com.kafka.consumer.consumer.TestMsgConsumer-1-1, groupId=com.kafka.consumer.consumer.TestMsgConsumer-1] Resetting offset for partition test-topic-89-0 to position FetchPosition{offset=12, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.10.33:9092 (id: 1 rack: null)], epoch=absent}}.
2021-06-03 09:29:51.188 INFO 28820 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : com.kafka.consumer.consumer.TestMsgConsumer-1: partitions assigned: [test-topic-89-1, test-topic-89-0]


#生产端:acks retries 事务 幂等性 发送batch 发送间隔时间 同步(异步)发送
#消费端:消费模式 rebalance(监听器) 重复消费问题 手动异步(同步提交,不会重试) 消费事务类型(已提交、未提交控制)

#消费提交模式 参考:https://blog.csdn.net/qq330983778/article/details/105937689
AckMode模式 作用
MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
MANUAL_IMMEDIATE 手动调用Acknowledgment.acknowledge()后立即提交
RECORD 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
BATCH 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
TIME 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
COUNT 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
COUNT_TIME TIME或COUNT 有一个条件满足时提交

#ISR数量限制
min.insync.replicas = n

replica.lag.time.max.ms:同步延迟时间

ISR数据保存在Zookeeper的 /brokers/topics/<topic>/partitions/<partitionId>/state节点中


#zookeeper查询topic分区信息
get /brokers/topics/test-topic-fix/partitions/1/state
{"controller_epoch":1,"leader":1,"version":1,"leader_epoch":0,"isr":[1]}
cZxid = 0x10000010a
ctime = Mon May 31 07:59:27 UTC 2021
mZxid = 0x10000010a
mtime = Mon May 31 07:59:27 UTC 2021
pZxid = 0x10000010a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0

#查看消息存储文件
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log

原文地址:https://www.cnblogs.com/dszazhy/p/14889235.html