kafka命令行操作

1)创建Topic

kafka-topics.sh  --create  --bootstrap-server hadoop102:9092 --topic second --partitions 2 --replication-factor 3

kafka-topics.sh  --create  --zookeeper hadoop102:2181 --topic second --partitions 2 --replication-factor 3

--create 要进行的操作,这里表示创建
--bootstrap-server 把broker注册信息存储到kafka集群
--zookeeper	把broker注册信息存储到zookeeper集群
--topic 给主题一个名字
--partitions 设置主题分区数
--replication-factor 副本存储数量,不能大于kafka集群数量

2)查看所有的Topic

kafka-topics.sh --list --bootstrap-server hadoop102:9092

3)查看topic的详细信息

kafka-topics.sh --describe --bootstrap-server hadoop102:9092 --topic second
#功能一样,下面的命令更加详细
kafka-topics.sh --describe --bootstrap-server hadoop102:9092
#ist属性表示节点数

4)修改topic

#把分区数量调大
kafka-topics.sh --alter --bootstrap-server hadoop102:9092 --topic first --partitions 2

5)删除topic

当kafka是旧版本时,需要先更改server.propertites里的文件,再删除

delete.topic.enable=true
kafka-topics.sh --delete --bootstrap-server hadoop102:9092 --topic first

6)生产者

kafka-console-producer.sh --topic second  --broker-list hadoop102:9092
--topic 指定生产的数据流向的topic
--broker-list 指定生产者的数据流向的broker

7)消费者

kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092
--topic 执行要消费的分区
--boostrap-server 要消费的broker
#上面的写法只能检测当前消费者启动后的数据,如果想检测所有数据,使用如下写法
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --from-beginning 

8)疑问:为什么启动一个新的消费者消费不到topic中已有的消息?

#下面写法会读取所有分区的数据,所以数据显示的顺序有可能会变
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --from-beginning 

9)消费者组:

1)启动消费者指定配置文件,读取配置文件中的group.id配置的组名

这里的配置文件,指的是kafka-2.4.1/config/consumer.properties

进入配置文件后,可以通过group id配置组名

2)启动消费者时通过--group来指定组名
​ kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --group testgroup

10)从目录的角度看topic

1)通过存储的角度观察
offset文件存储在每台kafka节点的
datas目录下,
格式为__consumer_offsets-xxx

原文地址:https://www.cnblogs.com/traveller-hzq/p/14105128.html