【Kafka】Kafka在ZooKeeper中的存储

一、Kafka在ZooKeeper中存储结构图

  

二、分析

2.1 topic注册信息

  /brokers/topics/[topic]:存储某个topic的partitions所有分配信息

[zk: 127.0.0.1:2181(CONNECTED) 1] ls /brokers/topics
[__consumer_offsets, first, second, test, test-more-replication] 
[zk: 127.0.0.1:2181(CONNECTED) 2] get -s /brokers/topics/test
{"version":2,"partitions":{"0":[2],"1":[0]},"adding_replicas":{},"removing_replicas":{}}
cZxid = 0x40000015b
ctime = Mon May 24 22:25:16 CST 2021
mZxid = 0x2700000430
mtime = Wed Jun 02 14:56:53 CST 2021
pZxid = 0x40000015d
cversion = 1
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 88
numChildren = 1
[zk: 127.0.0.1:2181(CONNECTED) 3] 
 说明:
Schema:
{ "version": "版本编号目前固定为数字1", "partitions": {"partitionId编号": [ 同步副本组brokerId列表],"partitionId编号": [同步副本组brokerId列表], .......}}
Example:
{"version": 1,"partitions": {"2": [2, 1, 0],"1": [1, 0, 2],"0": [0, 2, 1],}}

2.2 partition状态信息

  /brokers/topics/[topic]/partitions/[0...N]  其中[0..N]表示partition索引号

  /brokers/topics/[topic]/partitions/[partitionId]/state

[zk: 127.0.0.1:2181(CONNECTED) 3] ls /brokers/topics/test/partitions
[0, 1]
[zk: 127.0.0.1:2181(CONNECTED) 4] get -s /brokers/topics/test/partitions/0/state
{"controller_epoch":61,"leader":2,"version":1,"leader_epoch":64,"isr":[2]}
cZxid = 0x40000015f
ctime = Mon May 24 22:25:16 CST 2021
mZxid = 0x3000000300
mtime = Wed Jun 09 23:00:07 CST 2021
pZxid = 0x40000015f
cversion = 0
dataVersion = 64
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 74
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 5] 

  说明:

Schema:
{"controller_epoch": 表示kafka集群中的中央控制器选举次数,"leader": 表示该partition选举leader的brokerId,"version": 版本编号默认为1,
"leader_epoch": 该partition leader选举次数,"isr": [同步副本组brokerId列表]}
Example:
{"controller_epoch":61,"leader":0,"version":1,"leader_epoch":0,"isr":[0,2,1]}

2.3 Broker注册信息

  /brokers/ids/[0...N]                 

  每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL)

[zk: 127.0.0.1:2181(CONNECTED) 7] ls /brokers/ids
[0, 1, 2]
[zk: 127.0.0.1:2181(CONNECTED) 8] get -s /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://127.0.0.1:9092"],"jmx_port":-1,"port":9092,"host":"127.0.0.1","version":4,"timestamp":"1623250802168"}
cZxid = 0x30000002a0
ctime = Wed Jun 09 23:00:02 CST 2021
mZxid = 0x30000002a0
mtime = Wed Jun 09 23:00:02 CST 2021
pZxid = 0x30000002a0
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x3000003b9290024
dataLength = 200
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 9] 

  说明:

Schema:
{"jmx_port": jmx端口号,"timestamp": kafka broker初始启动时的时间戳,"host": 主机名或ip地址,"version": 版本编号默认为1,
"port": kafka broker的服务端端口号,由server.properties中参数port确定}
Example:
{"jmx_port":-1,"host":"192.168.100.21","timestamp":"1533452008040","port":9092,"version":4}

2.4 Controller epoch

  /controller_epoch -->  int (epoch)   

  此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1; 

[zk: 127.0.0.1:2181(CONNECTED) 10] get -s /controller_epoch
61
cZxid = 0x400000140
ctime = Mon May 24 22:24:05 CST 2021
mZxid = 0x30000002a1
mtime = Wed Jun 09 23:00:03 CST 2021
pZxid = 0x400000140
cversion = 0
dataVersion = 61
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 11] 

2.5 Controller注册信息

  /controller -> int (broker id of the controller)  存储center controller中央控制器所在kafka broker的信息

[zk: 127.0.0.1:2181(CONNECTED) 11] get -s /controller
{"version":1,"brokerid":0,"timestamp":"1623250803161"}
cZxid = 0x30000002a1
ctime = Wed Jun 09 23:00:03 CST 2021
mZxid = 0x30000002a1
mtime = Wed Jun 09 23:00:03 CST 2021
pZxid = 0x30000002a1
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3000003b9290024
dataLength = 54
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 12] 

  参考:https://www.cnblogs.com/frankdeng/p/9310713.html

原文地址:https://www.cnblogs.com/h--d/p/14871374.html