Kafka集群搭建及操作

一、配置单机Kafka

  Kafka官网:http://kafka.apache.org/

  1、下载&解压

wget https://mirrors.bfsu.edu.cn/apache/kafka/2.7.0/kafka_2.12-2.7.0.tgz
tar -zxvf kafka_2.12-2.7.0.tgz

  2、修改配置文件

vi /root/rj/kafka/kafka_2.12-2.7.0/config/server.properties

  修改内容

broker.id=0
listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/tmp/kafka-logs1
num.partitions=1
zookeeper.connect=127.0.0.1:2181

  3、启动&停止kafka

#启动
bin/kafka-server-start.sh -daemon config/server.properties
#停止
bin/kafka-server-stop.sh

  启动时加-daemon参数,是为了让kafka以守护进程的方式启动。

二、搭建Kafka集群

  1、搭建三台kafka服务器

  类似上面的单机搭建,配置信息如下:

    这里要特殊说明一下几个参数:

      broker.id:用来保证集群中的每一个kafka服务器都有一个broker

      listeners:本虚拟机ip及端口

      zookeeper.connect:zk服务器或zk集群

broker.id=0
listeners=PLAINTEXT://192.168.206.131:9092
log.dirs=/tmp/kafka-logs
num.partitions=1
zookeeper.connect=192.168.206.128:2181,192.168.206.129:2181,192.168.206.130:2181,192.168.206.134:2181

  2、分别启动三台服务器

bin/kafka-server-start.sh -daemon config/server.properties

   搭建完成

三、搭建Kafka伪集群

  1、复制两个server.properties配置文件

[root@lcl-aliyun kafka_2.12-2.7.0]# cp config/server.properties config/server2.properties
[root@lcl-aliyun kafka_2.12-2.7.0]# cp config/server.properties config/server3.properties

  2、修改配置文件

  主要修改broker.id和端口号以及日志存放路径

broker.id=1
listeners=PLAINTEXT://127.0.0.1:9093
log.dirs=/tmp/kafka-logs2
num.partitions=1
zookeeper.connect=127.0.0.1:2181
broker.id=2
listeners=PLAINTEXT://127.0.0.1:9094
log.dirs=/tmp/kafka-logs3
num.partitions=1
zookeeper.connect=127.0.0.1:2181

  3、分别启动三个kafka

bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server1.properties
bin/kafka-server-start.sh -daemon config/server2.properties

四、Kafka操作

  创建topic&查看topic

###创建topic
[root@localhost kafka_2.12-2.7.0]# bin/kafka-topics.sh --create --bootstrap-server 192.168.206.132:9092 --replication-factor 1 --partitions 1 --topic lcltest
Created topic lcltest.

###查看topic
[root@localhost kafka_2.12-2.7.0]# bin/kafka-topics.sh --list --bootstrap-server 192.168.206.132:9092
lcltest
test

  发送消息

[root@localhost kafka_2.12-2.7.0]# bin/kafka-console-producer.sh --broker-list 192.168.206.132:9092 --topic lcltest
>lcl
>qmm
>love

  接收消息

[root@localhost kafka_2.12-2.7.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.206.131:9092 --topic lcltest --from-beginning --group mygroup
lcl
qmm
love

  删除topic(由于上面已经有消息消费,因此会自动创建一个__consumer_offsets的topic)

[root@localhost kafka_2.12-2.7.0]# bin/kafka-topics.sh --delete --bootstrap-server 192.168.206.132:9092 --topic test
[root@localhost kafka_2.12-2.7.0]# bin/kafka-topics.sh --list --bootstrap-server 192.168.206.132:9092
__consumer_offsets
lcltest

五、Kafka日志

  1、一分区一备份

  分区是指topic会存在几台服务器上,备份是指同时有几个相同的topic编号,例如,设置分区为3,则topic会被均分为三份,分别为topicname-0、topicname-1、topicname-2,如果备份为1,那么以上三个topic分区,一台服务器上一个,如果备份为2,那么一台服务器上出现两个,如果备份为3,那么每台服务器上都是全量的topicname。

  Kafka的启动日志存放在logs目录中的server.log文件中

cat /root/rj/kafka/kafka_2.12-2.7.0/logs/server.log

  kafka的消息日志存放在/tem/kafka_logs目录中(以配置文件配置为准)

  以上述的lcltest这个topic为例,命令创建了一个备份和一个分区

   查看是只有一台服务器有lcltest这个topic,这说明只有一个分区,同时,这一台服务器中也只有一个lcltest-0的topic,这说明只有一个备份

   2、多分区多备份

  下面就模拟两个topic,一个topic为3分区3备份(topic:threerepli),一个topic为3分区2备份(topic:tworepli)

   创建topic

[root@localhost kafka_2.12-2.7.0]# bin/kafka-topics.sh --create --bootstrap-server 192.168.206.132:9092 --replication-factor 3 --partitions 3 --topic threerepli
Created topic threerepli.
[root@localhost kafka_2.12-2.7.0]# bin/kafka-topics.sh --create --bootstrap-server 192.168.206.132:9092 --replication-factor 3 --partitions 2 --topic tworepli
Created topic tworepli.

  分别来查看三台服务器的分配情况如下所示,可以看到,3分区3备份的topic(threerepli)在每台服务器上都是全量存在,而3分区2备份的topic(tworepli)在每一台服务器上存在两个,这样,有一台服务器宕机,2备份的集群仍然会存在全量的数据,而3备份的情况是有两台服务器宕机,其仍然保存着全量的数据。

   3、查看分区与备份在zk中的信息

    首先使用zkCli.sh命令连接上zk(直接在zk服务器上输入zkCli.sh即可)

    查看broker目录

[zk: localhost:2181(CONNECTED) 0] ls /brokers
[ids, seqid, topics]

    查看/brokers/ids目录,用于查看集群中各个主机的broker-id列表

[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[0, 1, 2]

    查看/brokers/ids/id目录,id为具体的值,用于查看当前主机信息

[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.206.131:9092"],"jmx_port":-1,"features":{},"host":"192.168.206.131","timestamp":"1616428178944","port":9092,"version":5}

    查看/brokers/topics,用于查看broker下所有的topic

[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[__consumer_offsets, lcltest, lcltest123, threerepli, tworepli]

    查看/brokers/topics/topic/partitions(其中topic为实际的topic名称)用于查看分区情况

[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/threerepli/partitions
[0, 1, 2]

    查看/brokers/topics/topic/partitions/partition/state(其中topic为实际的topic名称,partition为分区编号),用于查看分区中具体的情况

[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/threerepli/partitions/0/state
{"controller_epoch":3,"leader":1,"version":1,"leader_epoch":0,"isr":[1,2,0]}

  4、查看段segment

    segment是一个逻辑文件,由两类物理文件组成,分别为index文件和log文件,其中log文件中存放的是实际的消息,而index文件中存放的是log文件中消息的索引。

[root@localhost threerepli-2]# pwd
/tmp/kafka-logs/threerepli-2
[root@localhost threerepli-2]# ls
00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint

     对于segment中的log文件,是不能直接通过cat命令查看的,而是需要通过kafka自带的一个工具查看。

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/threerepli-2/00000000000000000000.log --print-data-log

    一个用户的一个主题会被提交到一个__consumer_offset分区中,使用主题字符串的hash值与50取模,就是分区索引。

------------------------------------------------------------------
-----------------------------------------------------------
---------------------------------------------
朦胧的夜 留笔~~
原文地址:https://www.cnblogs.com/liconglong/p/14563631.html