1)创建Topic
kafka-topics.sh --create --bootstrap-server hadoop102:9092 --topic second --partitions 2 --replication-factor 3
kafka-topics.sh --create --zookeeper hadoop102:2181 --topic second --partitions 2 --replication-factor 3
--create 要进行的操作,这里表示创建
--bootstrap-server 把broker注册信息存储到kafka集群
--zookeeper 把broker注册信息存储到zookeeper集群
--topic 给主题一个名字
--partitions 设置主题分区数
--replication-factor 副本存储数量,不能大于kafka集群数量
2)查看所有的Topic
kafka-topics.sh --list --bootstrap-server hadoop102:9092
3)查看topic的详细信息
kafka-topics.sh --describe --bootstrap-server hadoop102:9092 --topic second
#功能一样,下面的命令更加详细
kafka-topics.sh --describe --bootstrap-server hadoop102:9092
#ist属性表示节点数
4)修改topic
#把分区数量调大
kafka-topics.sh --alter --bootstrap-server hadoop102:9092 --topic first --partitions 2
5)删除topic
当kafka是旧版本时,需要先更改server.propertites里的文件,再删除
delete.topic.enable=true
kafka-topics.sh --delete --bootstrap-server hadoop102:9092 --topic first
6)生产者
kafka-console-producer.sh --topic second --broker-list hadoop102:9092
--topic 指定生产的数据流向的topic
--broker-list 指定生产者的数据流向的broker
7)消费者
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092
--topic 执行要消费的分区
--boostrap-server 要消费的broker
#上面的写法只能检测当前消费者启动后的数据,如果想检测所有数据,使用如下写法
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --from-beginning
8)疑问:为什么启动一个新的消费者消费不到topic中已有的消息?
#下面写法会读取所有分区的数据,所以数据显示的顺序有可能会变
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --from-beginning
9)消费者组:
1)启动消费者指定配置文件,读取配置文件中的group.id配置的组名
这里的配置文件,指的是kafka-2.4.1/config/consumer.properties
进入配置文件后,可以通过group id配置组名
2)启动消费者时通过--group来指定组名
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --group testgroup
10)从目录的角度看topic
1)通过存储的角度观察
offset文件存储在每台kafka节点的
datas目录下,
格式为__consumer_offsets-xxx