Centos7.5搭建3台kafka集群并安装kafka-manager

1、服务器规划

服务器名称 IP地址 角色
kafka-broker1 172.16.4.132 kafka节点1、zookeeper节点1、kafka-manager
kafka-broker2 172.16.4.133 kafka节点2、zookeeper节点2
kafka-broker3 172.16.4.134 kafka节点3、zookeeper节点3

2、服务器初始化

每个公司都不一样,建议谨慎操作。设置完重启下服务器

1、替换yum源
yum install wget -y
cp -r /etc/yum.repos.d /etc/yum.repos.d.bak
rm -f /etc/yum.repos.d/*.repo
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo 
&& wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo

yum clean all && yum makecache

2、安装常用命令
yum install lrzsz tree net-tools ntpdate lsof -y

3、同步时间并加入定时任务
ntpdate ntp1.aliyun.com
echo "*/15 * * * * /usr/sbin/ntpdate -u ntp1.aliyun.com >/dev/null 2>&1" >>/var/spool/cron/root

4、关闭selinux
setenforce 0 
&& sed -i 's/^SELINUX=.*$/SELINUX=disabled/' /etc/selinux/config 
&& getenforce

5、关闭防火墙
systemctl stop firewalld 
&& systemctl daemon-reload 
&& systemctl disable firewalld 
&& systemctl daemon-reload 
&& systemctl status firewalld

6、禁用UseDNS
sed -i 's/#UseDNS yes/UseDNS no/g' /etc/ssh/sshd_config
service sshd restart

7、设置系统参数
cat >> /etc/security/limits.conf <<EOF
# End of file
* soft nproc 655360
* hard nproc 655360
* soft nofile 655360
* hard nofile 655360
EOF

8、优化内核参数(建议按照自己的业务配置)
vi /etc/sysct.conf

net.ipv4.ip_forward = 0
net.ipv4.conf.default.rp_filter = 1
net.ipv4.conf.default.accept_source_route = 0
kernel.sysrq = 0
kernel.core_uses_pid = 1
net.ipv4.tcp_syncookies = 1
kernel.msgmnb = 65536
kernel.msgmax = 65536
kernel.shmmax = 68719476736
kernel.shmall = 4294967296
net.ipv4.ip_forward = 0
net.ipv4.conf.default.rp_filter = 1
net.ipv4.conf.default.accept_source_route = 0
kernel.sysrq = 0
kernel.core_uses_pid = 1
kernel.msgmnb = 65536
kernel.msgmax = 65536
kernel.shmmax = 68719476736
kernel.shmall = 4294967296
net.ipv4.tcp_max_tw_buckets = 60000
net.ipv4.tcp_sack = 1
net.ipv4.tcp_window_scaling = 1
net.ipv4.tcp_rmem = 4096 87380 4194304
net.ipv4.tcp_wmem = 4096 16384 4194304
net.core.wmem_default = 8388608
net.core.rmem_default = 8388608
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.core.netdev_max_backlog = 500000
net.core.somaxconn = 262144
net.ipv4.tcp_max_orphans = 3276800
net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_max_syn_backlog = 262144
net.ipv4.tcp_timestamps = 1
net.ipv4.tcp_synack_retries = 1
net.ipv4.tcp_syn_retries = 1
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_mem = 94500000 915000000 927000000
net.ipv4.tcp_fin_timeout = 1
net.ipv4.tcp_keepalive_time = 1200
net.ipv4.tcp_keepalive_probes=5 
net.ipv4.tcp_keepalive_intvl=15 
net.ipv4.ip_local_port_range = 1024 65535
net.nf_conntrack_max = 25000000
vm.swappiness = 0
vm.max_map_count=262144
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1

3、服务器配置的一些建议

1、CPU 24核
2、内存 64GB
3、磁盘 RAID10(追求性价比的公司可以不搭建RAID,使用普通磁盘组成存储空间即可。使用机械磁盘完全能够胜任Kafka线上环境。)

4、安装java

安装1.8.0_152版本

mkdir -p /usr/local/java
cd /usr/local/src && tar xvf jdk-8u152-linux-x64.tar.gz -C /usr/local/java

配置环境变量
vi /etc/profile

JAVA_HOME=/usr/local/java/jdk1.8.0_152
CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME CLASSPATH PATH

source /etc/profile

验证
java -version
java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)

5、安装zookeeper

1、ZooKeeper简介

ZooKeeper是一个开源的分布式应用程序协调服务,是Google的Chubby一个开源的实现。ZooKeeper为分布式应用提供一致性服务,提供的功能包括:分布式同步(Distributed Synchronization)、命名服务(Naming Service)、集群维护(Group Maintenance)、分布式锁(Distributed Lock)等,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。

ZooKeeper本身可以以单机模式安装运行,不过它的长处在于通过分布式ZooKeeper集群(一个Leader,多个Follower),基于一定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。

ZooKeeper集群角色说明

ZooKeeper主要有领导者(Leader)、跟随者(Follower)和观察者(Observer)三种角色。

角色 说明
领导者(Leader) 为客户端提供读和写的服务,负责投票的发起和决议,更新系统状态。
跟随者(Follower) 为客户端提供读服务,如果是写服务则转发给Leader。在选举过程中参与投票。
观察者(Observer) 为客户端提供读服务器,如果是写服务则转发给Leader。不参与选举过程中的投票,也不参与“过半写成功”策略。在不影响写性能的情况下提升集群的读性能。此角色于zookeeper3.3系列新增的角色。
#3台服务器上操作
创建目录和解压缩文件
mkdir -p /usr/local/data/zookeeper-3.4.11/data && mkdir -p /usr/local/data/zookeeper-3.4.11/dataLog

#3台服务器上操作
cd /usr/local/src && tar xvf zookeeper-3.4.10.tar.gz -C /usr/local

cp /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.11/conf/zoo.cfg

#3台服务器上操作
#编辑配置文件
> zoo.cfg
vi zoo.cfg

#写入以下内容并保存
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/data/zookeeper-3.4.11/data
dataLogDir=/usr/local/data/zookeeper-3.4.11/dataLog
clientPort=8181
autopurge.snapRetainCount=3
autopurge.purgeInterval=6
server.1=172.16.4.132:7888:8888
server.2=172.16.4.133:7888:8888
server.3=172.16.4.134:7888:8888

#配置节点标识
kafka-broker1(172.16.4.132)
echo "1" > /usr/local/data/zookeeper-3.4.11/data/myid

kafka-broker2(172.16.4.133)
echo "2" > /usr/local/data/zookeeper-3.4.11/data/myid

kafka-broker3(172.16.4.134)
echo "3" > /usr/local/data/zookeeper-3.4.11/data/myid

5.1 启动zookeeper

3台服务器上分别操作

cd /usr/local/zookeeper-3.4.11/bin
sh zkServer.sh start

#查看节点状态
[root@ser4-133 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.11/bin/../conf/zoo.cfg
Mode: leader

[root@ser4-132 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.11/bin/../conf/zoo.cfg
Mode: follower

[root@ser4-134 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.11/bin/../conf/zoo.cfg
Mode: follower

5.2 备注

ZooKeeper常用配置项说明

配置项 名称 ken.io 的说明
tickTime CS通信心跳间隔 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每间隔 tickTime 时间就会发送一个心跳。tickTime以毫秒为单位。
initLimit LF初始通信时限 集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数
syncLimit LF同步通信时限 集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数
dataDir 数据文件目录 Zookeeper保存数据的目录,默认情况下,Zookeeper将写数据的日志文件也保存在这个目录里
dataLogDir 日志文件目录 Zookeeper保存日志文件的目录
clientPort 客户端连接端口 客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
server.N 服务器名称与地址 从N开始依次为:服务编号、服务地址、LF通信端口、选举端口;例如:server.1=172.16.4.132:7888:8888

6、安装kafka

1、Kafka简介

Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(ElasticsearchHadoop等)

Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。

2、Kafka架构

Kafka架构图

在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。

概念/对象 简单说明
Broker Kafka节点
Topic 主题,用来承载消息
Partition 分区,用于主题分片存储
Producer 生产者,向主题发布消息的应用
Consumer 消费者,从主题订阅消息的应用
Consumer Group 消费者组,由多个消费者组成
#3台服务器上操作
创建目录和解压缩文件
mkdir -p /data/kafka/data

#3台服务器上操作
cd /usr/local/src && tar xvf kafka_2.11-0.11.0.0.tgz -C /usr/local

cp /usr/local/kafka_2.11-0.11.0.0/config/server.properties /usr/local/kafka_2.11-0.11.0.0/config/server.properties.bak

>server.properties

kafka-broker1(172.16.4.132)

vi server.properties

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://172.16.4.132:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.4.132:8181,172.16.4.133:8181,172.16.4.134:8181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
=======================================
kafka-broker2(172.16.4.133)

vi server.properties

broker.id=1
delete.topic.enable=true
listeners=PLAINTEXT://172.16.4.133:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.4.132:8181,172.16.4.133:8181,172.16.4.134:8181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
=======================================
kafka-broker3(172.16.4.134)

vi server.properties

broker.id=2
delete.topic.enable=true
listeners=PLAINTEXT://172.16.4.134:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.4.132:8181,172.16.4.133:8181,172.16.4.134:8181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
==========================================

#加入环境变量,3台机器都操作
vi /etc/profile

#增加如下配置
KAFKA_HOME=/usr/local/kafka_2.11-0.11.0.0
PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

6.1 修改kafka默认jvm参数

默认是1G

#3台服务器操作
vi /usr/local/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh

#修改此处
export KAFKA_HEAP_OPTS="-Xmx4g -Xms4g"

6.2 启动kafka

3台服务器上操作

nohup /usr/local/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh /usr/local/kafka_2.11-0.11.0.0/config/server.properties >>/usr/local/kafka-logs/kafka.log &

6.3 常用命令

#创建3个副本1个分区的topic

kafka-topics.sh --create --zookeeper 172.16.4.132:8181,172.16.4.133:8181,172.16.4.134:8181 --replication-factor 3 --partitions 1 --topic k8s-sale-html-sl-info

#查询某个topic
kafka-topics.sh --list --zookeeper  172.16.4.132:8181,172.16.4.133:8181|grep topic的名字

6.4 备注

Kafka常用Broker配置说明:

配置项 默认值/示例值 说明
broker.id 0 Broker唯一标识
listeners PLAINTEXT://172.16.4.132:9092 监听信息,PLAINTEXT表示明文传输
log.dirs kafka/logs kafka数据存放地址,可以填写多个。用”,”间隔
message.max.bytes message.max.bytes 单个消息长度限制,单位是字节
num.partitions 1 默认分区数
log.flush.interval.messages Long.MaxValue 在数据被写入到硬盘和消费者可用前最大累积的消息的数量
log.flush.interval.ms Long.MaxValue 在数据被写入到硬盘前的最大时间
log.flush.scheduler.interval.ms Long.MaxValue 检查数据是否要写入到硬盘的时间间隔。
log.retention.hours 24 控制一个log保留时间,单位:小时
zookeeper.connect 172.16.4.132:8181 ZooKeeper服务器地址,多台用”,”间隔

7、安装kafka-manager

mv /home/manager/kafka-manager-1.3.3.7.zip /usr/local/src
unzip kafka-manager-1.3.3.7.zip 
mv kafka-manager-1.3.3.7 /usr/local/

cd /usr/local/kafka-manager-1.3.3.7/conf
cp application.conf application.conf.bak

#修改application.conf配置

vi application.conf

# Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
# See accompanying LICENSE file.

# This is the main configuration file for the application.
# ~~~~~

# Secret key
# ~~~~~
# The secret key is used to secure cryptographics functions.
# If you deploy your application to several instances be sure to use the same key!
play.crypto.secret="^<csmm5Fx4d=r2HEX8pelM3iBkFVv?k[mc;IZE<_Qoq8EkX_/7@Zt6dP05Pzea3U"
play.crypto.secret=${?APPLICATION_SECRET}

# The application languages
# ~~~~~
play.i18n.langs=["en"]

play.http.requestHandler = "play.http.DefaultHttpRequestHandler"
play.http.context = "/"
play.application.loader=loader.KafkaManagerLoader

#kafka-manager.zkhosts="localhost:2181"
kafka-manager.zkhosts="172.16.4.132:8181,172.16.4.133:8181,172.16.4.134:8181" #修改为搭建的zk集群
kafka-manager.zkhosts=${?ZK_HOSTS}
pinned-dispatcher.type="PinnedDispatcher"
pinned-dispatcher.executor="thread-pool-executor"
application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
}


basicAuthentication.enabled=true
basicAuthentication.username="用户"            #设置登录用户
basicAuthentication.password="密码" #设置登录密码
basicAuthentication.realm="Kafka-Manager"
basicAuthentication.excluded=["/api/health"] # ping the health of your instance without authentification

#启动kafka-manager
nohup /usr/local/kafka-manager-1.3.3.7/bin/kafka-manager >>/usr/local/kafka-logs/kafka-manager.log &

#访问地址

http://172.16.4.132:9000/

8、遇到的坑

1、目录没创建,启动失败
2、依赖包没安装,启动kafka-manager失败,不记得是哪个包了
3、kafka旧版本的命令和新版本的有些不一样,网上的文档谨慎操作

9、参考

https://ken.io/note/elk-deploy-guide
原文地址:https://www.cnblogs.com/uglyliu/p/12468991.html