kafka实战

一、下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz

Java: http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

Java的安装http://czj4451.iteye.com/blog/2041159

二、启动kafka自带的zookeeper: ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &  (用&是为了能退出命令行)

启动Kafka:1)虚拟机启动需要把这个环境变量设小,用默认的话会有问题export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

2) ./bin/kafka-server-start.sh config/server.properties &

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

停止服务: ./bin/kafka-server-stop.sh      ./bin/zookeeper-server-stop.sh

三、构造一个跨虚拟机的、只有一个kafka服务的环境

1)两个在一个局域网的Ubuntu虚拟机(一个叫master,一个叫slaver1)

2)一个虚拟机(master)上面起zookeeper、kafka服务、kafka-producer;

要修改下config/server.properties中的listeners(效果同之前的版本的host.name及port:注意绑定host.name,否则可能出现莫名其妙的错误如consumer找不到broker。这个host.name是Kafka的server的机器名字,会注册到Zookeeper中)

虚拟机ip是10.0.0.5,不改的话,listeners默认取值localhost

producer代码是

 

3)另一个虚拟机(slaver1)上起kafka-consumer

代码是

==============================

 kafka shell基本命令:

http://www.cnblogs.com/xiaodf/p/6093261.html

http://orchome.com/35

-1)创建topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   --partitions 1 --topic Hello-Kafka

0)查看所有topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
1)查看某个topic信息
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic haha
2)查看topic某分区偏移量
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic haha  --time -1 --broker-list localhost:9092 --partitions 0
3)列出所有主题中的所有用户组:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
如果你使用老的高级消费者并存储分组元数据在zookeeper(即。offsets.storage=zookeeper)通过--zookeeper,而不是bootstrap-server:
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
4)查看消费进度
 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group wind
如果你使用老的高级消费者并存储分组元数据在zookeeper(即。offsets.storage=zookeeper)通过--zookeeper,而不是bootstrap-server:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group console-consumer-25010 --zookeeper localhost:2181

topic:创建时topic名称

partition:分区编号

offset:表示该parition已经消费了多少条message
logSize:表示该partition已经写了多少条message
Lag:表示有多少条message没有被消费。
Owner:表示消费者
Created:该partition创建时间

=============注意事项======

1)如果需要在其他节点作为客户端使用指令连接kafka broker,则需要注意设置kafka broker 配置文件中 host.name 参数为监听的IP地址

原文地址:https://www.cnblogs.com/testzcy/p/7514451.html