linux部署kafka环境

一、单节点环境

部署kafka,需要先部署JDK与zookeeper

1、单节点zookeeper

官网下载zookeeper最新版apache-zookeeper-3.5.7-bin.tar.gz

tar -zxvf  apache-zookeeper-3.5.7-bin.tar.gz
cd  apache-zookeeper-3.5.7-bin/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg  //指定跟节点路径dataDir
bin/zkServer.sh start  //STARTED启动成功
telnet 127.0.0.1 2181 //测试

 2、单节点kafka

官网下载kafka最新版kafka_2.11-2.4.0.tgz

tar -zxvf kafka_2.11-2.4.0.tgz
cd kafka_2.11-2.4.0/config/
vim server.properties //修改logs路径
bin/kafka-server-start.sh ./../config/server.properties //...started启动成功

二、多节点环境

1、多节点zookeeper集群

多节点配置文件zoo.cfg

tickTime=2000  //最小时间单位,测量心跳时间、超时等
initLimit=10      //follower初始时连接leader节点的最大tick次数
syncLimit=5     //follower节点与leader节点进行同步的最大时间
dataDir=/home/wq/zookeeper/cluster/2191   //内存快照持久化另外两个2192、2193
clientPort=2191  //监听端口
server.1=mcip:2888:3888 //serverX=host:port1:port2
server.2=mcip:2889:3889 //X:zookeeper的myid,host:zookeeper主机,port1:followe节点连接leader节点,port2:leader选举
server.3=mcip:2890:3890 //这里是同一虚拟机,所以指定port不同,多台实体服务器建议设置port一致。

小技巧:由于连接WiFi,虚拟机ip不停的在变,修改一下/etc/hosts文件,所有中间件的配置文件设置本机ip时,用mcip替换。换wifi了,修改一下hosts文件即可。

  

 在各自dataDir下创建myid文件内容与上面server.X中X保持一致

touch myid
vim myid

另外由于java版本也可通过jps -m查看启动结果

cluster.sh脚本

#!bin/bash
/usr/etc/apache-zookeeper-3.5.7-bin/bin/zkServer.sh $1 /home/wq/zookeeper/cluster/2191/zoo.cfg
/usr/etc/apache-zookeeper-3.5.7-bin/bin/zkServer.sh $1 /home/wq/zookeeper/cluster/2192/zoo.cfg
/usr/etc/apache-zookeeper-3.5.7-bin/bin/zkServer.sh $1 /home/wq/zookeeper/cluster/2193/zoo.cfg

 2、多节点kafka集群

多节点配置文件server.properties

broker.id=3  //需要设置不同1/2/3
listeners=PLAINTEXT://mcip:9093  //9091/9092/9093  
log.dirs=/home/wq/kafka/cluster/logs_9093    //日志路径文件夹必须为空且三个节点不能一样logs_9091/logs_9092/logs_9093,是kafka持久化消息的目录
zookeeper.connect=mcip:2191,mcip:2192,mcip:2193
zookeeper.connection.timeout.ms=6000

 3、测试topic创建与删除

kafka-topics.sh脚本:topic创建、查看、删除
/usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --create --topic topic-test --partitions 3 --replication-factor 3
/usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --list topic-test
/usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --describe --topic topic-test
/usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --delete --topic topic-test

 4、测试消息发送和消费

kafka-console-producer脚本和kafka-console-consumer脚本

//需要开两个终端
/usr/etc/kafka_2.11-2.4.0/bin/kafka-console-producer.sh --broker-list mcip:9091,mcip:9092,mcip:9093 --topic topic-test
/usr/etc/kafka_2.11-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server mcip:9091,mcip:9092,mcip:9093 --topic topic-test --from-beginning

 

5、创建cluster.sh脚本,

#!bin/bash
if [ "$1" = "start" ] ;then
    /usr/etc/kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon /home/wq/kafka/cluster/conf/server1.properties
    /usr/etc/kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon /home/wq/kafka/cluster/conf/server2.properties
    /usr/etc/kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon /home/wq/kafka/cluster/conf/server3.properties
elif [ "$1" = "stop" ] ;then
    /usr/etc/kafka_2.11-2.4.0/bin/kafka-server-stop.sh /home/wq/kafka/cluster/conf/server1.properties
    /usr/etc/kafka_2.11-2.4.0/bin/kafka-server-stop.sh /home/wq/kafka/cluster/conf/server2.properties
    /usr/etc/kafka_2.11-2.4.0/bin/kafka-server-stop.sh /home/wq/kafka/cluster/conf/server3.properties
elif [ "$1" = "topic" ] ;then
    if [ "$2" = "create" ] ;then
        /usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --$2 --topic $3 --partitions $4 --replication-factor $5
    elif [ "$2" = "list" ] ;then
        /usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --$2 $3
    elif [ "$2" = "describe" ] ;then
         /usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --$2 --topic $3
    elif [ "$2" = "delete" ] ;then
        /usr/etc/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper mcip:2191,mcip:2192,mcip:2193 --$2 --topic $3
    else
        echo "should cluster.sh <topic> <create,list,describe,delete> <topic-name> <partitionsNum>"
    fi
elif [ "$1" = "producer" ] ;then
    /usr/etc/kafka_2.11-2.4.0/bin/kafka-console-producer.sh --broker-list mcip:9091,mcip:9092,mcip:9093 --topic $2
elif [ "$1" = "consumer" ] ;then
    /usr/etc/kafka_2.11-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server mcip:9091,mcip:9092,mcip:9093 --topic $2 --from-beginning
else
    echo "should cluster.sh <start,stop,topic,producer,consumer>"
fi

 集群结构图

① 从上面kafka启动配置文件server.properties,可以看出kafka启动是不知道其他kafka服务的,通过注册

ZooKeeper来完成kafka集群,由Zookeeper来同一管理各节点状态及通信。伸缩性强,新增kafka服务仅需要注册到ZooKeeper中就行了。

②关于ZooKeeper集群采用的leader-follower模型,半数以上服务正常运行,则集群正常提供服务。所以经常采用奇数个服务组成集群。

例如:5个ZooKeeper服务组成集群,至少需要三个follower服务器才能选举出leader,为保证集群正常,允许宕机2个

           4个ZooKeeper服务组成集群,至少需要三个follower服务器才能选举出leader,为保证集群正常,允许宕机1个

           6个ZooKeeper服务组成集群,至少需要四个follower服务器才能选举出leader,为保证集群正常,允许宕机2个

leader-follower模型:仅leader对外服务,follower作用仅仅是同步leader数据和当leader宕机时,重新选举leader。

5个相比于4个,多允许宕机数,集群稳健性提升。

5个相比于6个,集群效果是一样的,但是节省了一个服务。

原文地址:https://www.cnblogs.com/wqff-biubiu/p/12318701.html