topic的相关操作

1.建立topic

cd 进入kafka的安装根目录的bin目录下

执行:./kafka-topics.sh --zookeeper ip:port,ip:port,ip:port/kafka-test --create --topic topicname --partitions 4 -replication-factor 1 -config retention.ms=864000000

注解:./kafka-topics.sh>启动命令。ip:port>zk的ip和端口号。kafka-test>kafka在zk里面注册的组名。partitions>topic分区数量可自定义。replication-factor>备份数。retention.ms>数据保存时间。

2.删除topic

cd 进入kafka的安装根目录的bin目录下

执行:./kafka-topics.sh --delete --zookeeper ip:port,ip:port,ip:port/kafka-test --topic topicname

3.删除topic的log文件

cd 进入kafka的安装根目录的config目录下

执行:more server.properties。找到Log Basics下的log.dirs=/x/x/x 存储log的路径,找到对应目录删除对应的文件夹即可,topic分区数量决定文件夹的数量,通常为多个,如果有多台kafka,均匀分布。

4.修改topic的offset数,重启相关应用程序后可以从你指定的offset位置开始读数据。

进入zookeeper的命令行空间,通常是运行./Zkclent.sh 进入zk的操作命令行,

set /consumers/testgroup/offsets/test/0 666

set /consumers/你的kafka组/offsets/topic名字/分区 offset值

5.查看所有topic

cd 进入kafka的安装目录的bin目录下

执行:./kafka-topics.sh --list --zookeeper ip:port,ip:port,ip:port/kafka在zk里面注册的组名

6.计算kafka集群分区数量。

我们可以粗略地通过吞吐量来计算kafka集群的分区数量。假设对于单个partition,producer端的可达吞吐量为p,Consumer端的可达吞吐量为c,期望的目标吞吐量为t,那么集群所需要的partition数量至少为max(t/p,t/c)。在producer端,单个分区的吞吐量大小会受到批量大小、数据影响。

7.创建生产和消费者

cd 进入kafka的安装目录的bin目录下

生产者创建执行:./kafka-console-producer.sh --broker-list zk集群的ip和port --topic topicname

消费者创建执行:./kafka-console-consumer.sh --zookeeper ip:port,ip:port,ip:port --from-beginning --topic topicname

8.实时接收topic内的数据到指定文件内

kafka-console-consumer.sh --topic topicname --zookeeper ip:port,ip:port,ip:port/kafka-test >>/xxx/xxx/TEST.txt

也可只存最后10条

kafka-console-consumer.sh  --zookeeper ip:port,ip:port,ip:port/kafka-test --topic topicname --max-messages 10>>/xxx/xxx/TEST.txt

原文地址:https://www.cnblogs.com/lccyb/p/9406988.html