第2章、Kafka集群部署

一、环境准备

1.1、集群规划

broker0 broker1  broker2
zk zk zk
kafka  kafka  kafka 

1.2、jar包下载

  http://kafka.apache.org/downloads.html

二、Kafka集群部署

  • 解压安装包
tar -zxvf kafka_2.11-0.11.0.0.tgz -C  /usr/local
  • 修改解压后的文件名称
cd  /usr/local
mv kafka_2.11-0.11.0.0/ kafka
  • 在/opt/module/kafka目录下创建logs文件夹
mkdir -p /opt/module/kafka/logs
  • 修改配置文件
# 操作前要备份
vim /usr/local/kafka/config/zookeeper.properties

  输入以下内容:

#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径    
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181
  • 配置环境变量
sudo vim /etc/profile

#KAFKA_HOME
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile
  • 安装好之后可以通过虚拟机克隆,也可以通过脚本分发,

  注意:分发之后记得配置其他机器的环境变量;分别在kafka02和kafka03上修改配置文件/usr/local/kafka/config/zookeeper.properties中的broker.id=1、broker.id=2broker.id不得重复

  • 启动集群

  依次在kafka01、kafka02、kafka03节点上启动kafka

# 注意如果是自己配置的zk需要先启动zk,如果是kafka自带的zk也要先启动zk
# 下面是自动kafka自带的zk
bin/kafka-server-start.sh -daemon config/server.properties 
# 启动kafka
bin/kafka-server-start.sh config/server.properties
  • 后台启动kafka
# 后台启动kafka
cd /usr/local/kafka

nohup bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

#如果使用kafka自带的zookeeper则需要先后台启动zookeeper:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties 
  • 关闭集群
bin/kafka-server-stop.sh stop 
  • 集群启动脚本
#!/bin/bash

case $1 in
"start"){
                for i in broker0 broker1 broker2
                do
                        echo "*********$i**********"
                        ssh $i "/opt/local/kafka/bin/kafka-server-start.sh -daemon /opt/loacl/kafka/config/server.properties"
                done        
};;

"stop"){
                for i in broker0 broker1 broker2
                do
                        echo "*********$i**********"
                        ssh $i "/opt/local/kafka/bin/kafka-server-stop.sh"
                done        
};;

esac

三、Kafka命令行操作

  • Kafka命令行操作
bin/kafka-topics.sh --zookeeper broker2:2181 --list
  • 创建topic
[root@broker2 kafka]# bin/kafka-topics.sh --zookeeper broker2:2181 --create --replication-factor 2 --partitions 2 --topic first
Created topic first1.

  选项说明:

    • --topic 定义topic名
    • --replication-factor 定义副本数:副本数不能超过机器broker的数量
    • --partitions 定义分区数
  • 删除topic
$ bin/kafka-topics.sh --zookeeper broker2:2181   --delete --topic first

  需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

  • 发送消息
$ bin/kafka-console-producer.sh   --broker-list broker2:9092 --topic first
>hello world
  • 消费消息
 bin/kafka-console-consumer.sh --bootstrap-server broker2:9092 --topic first --from-beginning

  --from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

  • 查看某个Topic的详情
$ bin/kafka-topics.sh --zookeeper master:2181  --describe --topic first
原文地址:https://www.cnblogs.com/jdy1022/p/14393440.html