ELK 使用4-Kafka + zookpeer

一、zookpeer操作

  1、登录

    /application/elk/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181

  2、查看结构

    ls /

      

      上面的显示结果中:只有zookeeper是,zookeeper原生的,其他都是Kafka创建的

    ls /brokers/topics

  3、获取配置文件

    get /brokers/ids/0

    

    get /brokers/topics/test2/partitions/0

    

二、kafka操作:

  1、增topic

    /application/elk/kafka/bin/kafka-topics.sh --zookeeper 192.168.30.41:2181,192.168.30.42:2181,192.168.30.43:2181 --create --topic test --replication-factor 3 --partitions 3

    注: partitions指定topic分区数,replication-factor指定topic每个分区的副本数

      a、partitions :分区数

        1)、控制topic将分片成多少个log。可以显示指定,如果不指定则会使用broker(server.properties)中的num.partitions配置的数量
        2)、虽然增加分区数可以提供kafka集群的吞吐量、但是过多的分区数或者或是单台服务器上的分区数过多,会增加不可用及延迟的风险。因为多的分区数,意味着需要打开更多的文件句柄、增加点到点的延时、增加客户端的内存消耗。
        3)、分区数也限制了consumer的并行度,即限制了并行consumer消息的线程数不能大于分区数
        4)、分区数也限制了producer发送消息是指定的分区。如创建topic时分区设置为1,producer发送消息时通过自定义的分区方法指定分区为2或以上的数都会出错的;这种情况可以通过alter –partitions 来增加分区数。
      b、replication-factor副本
        1)、replication factor 控制消息保存在几个broker(服务器)上,一般情况下等于broker的个数。
        2)、如果没有在创建时显示指定或通过API向一个不存在的topic生产消息时会使用broker(server.properties)中的default.replication.factor配置的数量

  2、删topic

    /application/elk/kafka/bin/kafka-topics.sh --zookeeper 192.168.30.41:2181,192.168.30.42:2181,192.168.30.43:2181 --delete --topic test1

  3、查

    /application/elk/kafka/bin/kafka-topics.sh --zookeeper 192.168.30.41:2181,192.168.30.42:2181,192.168.30.43:2181 --describe --topic test 

    

      - Leader:负责处理消息的读和写,Leader是从所有节点中随机选择的

      - Replicas:列出了所有的副本节点,不管节点是否在服务中。

      - Isr:是正在服务中的节点

  4、查看所有topic

    /application/elk/kafka/bin/kafka-topics.sh --zookeeper 192.168.30.41:2181,192.168.30.42:2181,192.168.30.43:2181 --list

  5、生产消息

    /application/elk/kafka/bin/kafka-console-producer.sh --broker-list 192.168.30.41:9092,192.168.30.42:9092,192.168.30.43:9092 --topic wohaoshuai

    

  6、消费消息

    /application/elk/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.30.41:2181,192.168.30.42:2181,192.168.30.43:2181 --topic wohaoshuai --from-beginning

      

  7、查看某分区偏移量最大(小)值

    /application/elk/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic wohaoshuai  --time -1 --broker-list 192.168.30.41:9092,192.168.30.42:9092,192.168.30.43:9092 --partitions 0

    注: time为-1时表示最大值,time为-2时表示最小值

  8、增加topic分区数

    为wohaoshuai增加10个分区

    /application/elk/kafka/bin/kafka-topics.sh --zookeeper 192.168.30.41:2181,192.168.30.42:2181,192.168.30.43:2181 --alter --topic wohaoshuai --partitions 10

    

  9、通过group消费消息,当前版本group id需要在配置文件中设置,运行时需指明配置文件。

    /application/elk/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.30.41:9092,192.168.30.42:9092,192.168.30.43:9092 --topic wohaoshuai --consumer.config /application/elk/kafka/config/consumer.properties --offset 0 --partition 1

    /application/elk/kafka/config/consumer.properties文件中内容为

    

    注:指定分区时就只是取指定分区的消息

  10、查看所有消费组

    /application/elk/kafka/bin/kafka-consumer-groups.sh  --new-consumer --bootstrap-server 192.168.30.41:9092,192.168.30.42:9092,192.168.30.43:9092  --list

  11、查看某个消费组的情况

    /application/elk/kafka/bin/kafka-consumer-groups.sh  --new-consumer --bootstrap-server 192.168.30.41:9092,192.168.30.42:9092,192.168.30.43:9092  --group wohaoshuai_group --describe

    

    

    

原文地址:https://www.cnblogs.com/Presley-lpc/p/9946422.html