kafka安装

安装Kafka:
我们使用3台机器搭建Kafka集群:
192.168.121.132   h201
192.168.121.131   h202

192.168.121.130   h203

下载kafka_2.10-0.10.1.0.tgz

[hadoop@h201 ~]$ tar -zxvf kafka_2.10-0.10.1.0.tgz
[hadoop@h201 ~]$ cd kafka_2.10-0.10.1.0
修改配置文件kafka_2.10-0.10.1.0/config/server.properties,修改如下内容为:
zookeeper.connect=h201:2181,h202:2181,h203:2181/kafka
这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定。

使用如下命令连接到任意一台ZooKeeper服务器:
[hadoop@h201 ~]$ cd zookeeper-3.4.5-cdh5.5.2/
[hadoop@h201 zookeeper-3.4.5]$ bin/zkCli.sh

connect h201:2181

create /kafka
ls /kafka/brokers/topics
ls /kafka/brokers/ids

将kafka拷贝到其他两台机器:
[hadoop@h201 ~]$ scp -r kafka_2.10-0.10.1.0/ h202:/home/hadoop/
[hadoop@h201 ~]$ scp -r kafka_2.10-0.10.1.0/ h203:/home/hadoop/

修改server.properties中的broker.id,三台机器都修改
因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,但也需要Broker的id唯一,还需要修改一些配置目录的信息)。
[hadoop@h201 ~]$ vi kafka_2.10-0.10.1.0/config/server.properties
broker.id=0
[hadoop@h202 ~]$ vi kafka_2.10-0.10.1.0/config/server.properties
broker.id=1
[hadoop@h203 ~]$ vi kafka_2.10-0.10.1.0/config/server.properties
broker.id=2

首先启动zookeeper集群,然后三台机器都执行以下命令:

kafka-server-start.sh -daemon /home/hadoop/kafka_2.10-0.10.1.0/config/server.properties
kafka-server-stop.sh -daemon /home/hadoop/kafka_2.10-0.10.1.0/config/server.properties

创建topic:

bin/kafka-topics.sh --create --zookeeper h201:2181/kafka --replication-factor 3 --partitions 5 --topic data

查看创建的topic:

kafka-topics.sh --describe --zookeeper h201:2181/kafka --topic data

kafka-topics.sh --zookeeper h201:2181/kafka --list

上面Leader、Replicas、Isr的含义如下:
Partition: 分区
Leader   : 负责读写指定分区的节点
Replicas : 复制该分区log的节点列表

Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader

删除topic:
在kafka集群的所有三台机器中修改配置文件server.properties
添加如下配置:delete.topic.enable=true

kafka-topics.sh --delete --zookeeper h201:2181/kafka --topic data

自带压力测试

kafka-console-producer.sh --broker-list h201:9092 --topic data

kafka-console-consumer.sh --zookeeper h201:2181/kafka --topic data --from-beginning

生产者:throughput吞吐量
[hadoop@h202 config]$ kafka-producer-perf-test.sh --num-records 100000 --record-size 3000 --topic data --producer-props bootstrap.servers=h201:9092,h202:9092,h203:9092 acks=all --record-size 1000 --throughput 1000
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoop/work/hadoop2.6jar/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop/kafka_2.10-0.10.1.0/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
4399 records sent, 860.2 records/sec (0.82 MB/sec), 64.0 ms avg latency, 725.0 max latency.
2857 records sent, 513.4 records/sec (0.49 MB/sec), 237.0 ms avg latency, 3464.0 max latency.
4461 records sent, 892.0 records/sec (0.85 MB/sec), 3908.1 ms avg latency, 6209.0 max latency.
7809 records sent, 1561.8 records/sec (1.49 MB/sec), 2653.6 ms avg latency, 5225.0 max latency.
6177 records sent, 1235.2 records/sec (1.18 MB/sec), 961.4 ms avg latency, 2343.0 max latency.
4991 records sent, 976.7 records/sec (0.93 MB/sec), 218.8 ms avg latency, 1001.0 max latency.
4444 records sent, 843.1 records/sec (0.80 MB/sec), 390.9 ms avg latency, 2136.0 max latency.
5867 records sent, 1169.0 records/sec (1.11 MB/sec), 149.5 ms avg latency, 1086.0 max latency.
4713 records sent, 810.4 records/sec (0.77 MB/sec), 95.1 ms avg latency, 1304.0 max latency.
5235 records sent, 1046.8 records/sec (1.00 MB/sec), 1083.5 ms avg latency, 2910.0 max latency.
6041 records sent, 1122.9 records/sec (1.07 MB/sec), 411.3 ms avg latency, 1252.0 max latency.
5402 records sent, 1075.5 records/sec (1.03 MB/sec), 92.3 ms avg latency, 471.0 max latency.
5019 records sent, 1003.2 records/sec (0.96 MB/sec), 46.6 ms avg latency, 331.0 max latency.
4971 records sent, 994.2 records/sec (0.95 MB/sec), 142.3 ms avg latency, 677.0 max latency.
5026 records sent, 1005.0 records/sec (0.96 MB/sec), 76.9 ms avg latency, 555.0 max latency.
4461 records sent, 712.4 records/sec (0.68 MB/sec), 858.5 ms avg latency, 2328.0 max latency.
6240 records sent, 1247.8 records/sec (1.19 MB/sec), 596.2 ms avg latency, 2434.0 max latency.
5564 records sent, 1112.6 records/sec (1.06 MB/sec), 306.2 ms avg latency, 1089.0 max latency.
4998 records sent, 999.6 records/sec (0.95 MB/sec), 110.0 ms avg latency, 1215.0 max latency.
100000 records sent, 999.360409 records/sec (0.95 MB/sec), 692.04 ms avg latency, 6209.00 ms max latency, 156 ms 50th, 4165 ms 95th, 5337 ms 99th, 5761 ms 99.9th.

消费者:
[hadoop@h201 ~]$ kafka-consumer-perf-test.sh --zookeeper h201:2181/kafka --messages 100000 --topic data --threads 3
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoop/work/hadoop2.6jar/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop/kafka_2.10-0.10.1.0/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2018-07-08 22:40:31:658, 2018-07-08 22:40:34:270, 9.8706, 3.7789, 3460, 1324.6554

1、 生产者测试结果及分析

结果分析:
1)kafka在批处理,多线程,不适用同步复制的情况下,吞吐率是比较高的,可以达26MB/s,消息数达17w条/s以上。
2)使用批处理或多线程对提升生产者吞吐率效果明显。
3)复制因子会对吞吐率产生较明显影响
使用同步复制时,复制因子会对吞吐率产生较明显的影响。复制因子为2比因子为1(即无复制)时,吞吐率下降40%左右。


4)使用sync方式,性能有明显下降。
使用Sync方式producer吞吐率会有明显下降
5)压缩与吞吐率
使用Gzip及Snappy方式压缩,吞吐率反而有下降,原因待分析。而Snappy方式吞吐率高于gzip方式。
6)分区数与吞吐率
分区数增加生产者吞吐率反而有所下降

2、消费者结果及分析


结果分析:

1)kafka consumer吞吐率在parition,threads较大的情况下,在测试场景下,最大吞吐率达到了34MB/s
2)复制因子,影响较小。replication factor并不会影响consumer的吞吐率测试, consumer从每个partition的leader读数据,而与replication factor无关。同样,consumer吞吐率也与同步复制还是异步复制无关。
3)线程数和partition与吞吐率关系

当分区数较大时,增加thread数可显著提升consumer的吞吐率。
但要注意在分区较大时线程数不改大于分区数,否则会出现No broker partitions consumed by consumer,对提升吞吐率也没有帮助。

4)批处理数对吞吐率影响

改变批处理数对吞吐率影响不大
5)压缩与吞吐率

压缩对吞吐率影响小。

附优化后的配置文件:

broker.id=1

listeners=PLAINTEXT://0.0.0.0:6667

advertised.listeners=PLAINTEXT://203.150.54.215:6667

port=6667

host.name=203.150.54.215

# Replication configurations

num.replica.fetchers=1

replica.fetch.max.bytes=1048576

replica.fetch.wait.max.ms=500

replica.high.watermark.checkpoint.interval.ms=5000

replica.socket.timeout.ms=30000

replica.socket.receive.buffer.bytes=65536

replica.lag.time.max.ms=10000

replica.lag.max.messages=4000

compression.codec:none

controller.socket.timeout.ms=30000

controller.message.queue.size=10

controlled.shutdown.enable=true

default.replication.factor:2

# Log configuration

num.partitions=1

num.recovery.threads.per.data.dir=1

message.max.bytes=1000000

auto.create.topics.enable=true

auto.leader.rebalance.enable=true

log.dirs=/mnt/kafka-logs/kafka00

log.index.interval.bytes=4096

log.index.size.max.bytes=10485760

log.retention.hours=72 #保留三天,也可以更短

log.flush.interval.ms=10000 #每间隔1秒钟时间,刷数据到磁盘

log.flush.interval.messages=20000 #log数据文件刷新策略

log.flush.scheduler.interval.ms=2000

log.roll.hours=72

log.retention.check.interval.ms=300000

log.segment.bytes=1073741824 #kafka启动时是单线程扫描目录(log.dir)下所有数据文件

# ZK configuration

zookeeper.connection.timeout.ms=6000

zookeeper.sync.time.ms=2000

zookeeper.connect=203.150.54.215:2181,203.150.54.216:2182,203.150.54.217:2183

# Socket server configuration

num.io.threads=5 #配置线程数量为cpu核数加1

num.network.threads=8 #配置线程数量为cpu核数2倍,最大不超过3倍

socket.request.max.bytes=104857600

socket.receive.buffer.bytes=1048576

socket.send.buffer.bytes=1048576

queued.max.requests=500

fetch.purgatory.purge.interval.requests=1000

producer.purgatory.purge.interval.requests=1000

原文地址:https://www.cnblogs.com/jieran/p/9310437.html