创建kafak集群

下载kafka

下载页面:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz

# wget http://mirror.bit.edu.cn/apache/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
# tar xf kafka_2.11-0.11.0.0.tgz
# cd kafka_2.11-0.11.0.0
# cd conf
# touch s1.

在kafka的项目下创建3个节点的配置文件

# cd kafka_2.11-0.11.0.0
# cd conf
# touch s1.properties s2.properties s3.properties
# cat s1.properties
broker.id=1
host.name=192.168.7.203
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs-1
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.7.203:2181,192.168.7.203:2182,192.168.7.203:2183
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=false

# cat s2.properties
broker.id=2
host.name=192.168.7.203
port=9093
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs-2
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.7.203:2181,192.168.7.203:2182,192.168.7.203:2183
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=false

# cat s3.properties 
broker.id=3
host.name=192.168.7.203
port=9094
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs-3
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.7.203:2181,192.168.7.203:2182,192.168.7.203:2183
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=false

启动kafka

# cd kafka_2.12-0.11.0.0
# bin/kafka-server-start.sh -daemon config/s1.properties
# bin/kafka-server-start.sh -daemon config/s2.properties
# bin/kafka-server-start.sh -daemon config/s3.properties


检查是否启动成功

# jps
131458 QuorumPeerMain
131510 QuorumPeerMain
131416 QuorumPeerMain
162396 Jps
149869 Kafka

相关操作命令

创建topic
bin/kafka-topics.sh --create --zookeeper 192.168.7.203:2181 --replication-factor 1 --partitions 1 --topic consumptionsale

查看全部topic:
bin/kafka-topics.sh --list --zookeeper 192.168.19.98:2181

查看topic的详细信息:
bin/kafka-topics.sh --zookeeper 192.168.7.203:2183 --describe --topic consumptionsale

生产信息:
bin/kafka-console-producer.sh --broker-list 192.168.19.98:9092 --topic test

消费信息:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.19.98:9092 --topic test --from-beginning

bin/kafka-console-consumer.sh --zookeeper 192.168.19.98:2181 --topic test --from-beginning

后台启动kafka
bin/kafka-server-start.sh -daemon config/server-2.properties

kafka添加节点

添加新节点很简单,只需要器启动一个kafka节点,其中使用同一个集群的zookeeper配置,更改ip和端口号和存储路径即可。

kafka topic 调整

当一个topic建立的时候,指定了分区和副本数量,kafka集群一般只会在broker故障的时候进行分区的leader切换,一般不会更改副本和分区的存储位置。当kafka集群扩容和迁移的时候必须要进行手动调整才能改变分区和副本所在的broker。

进行分区迁移时最好先保留一个分区在原来的磁盘,这样不会影响正常的消费和生产,因为一次迁移所有的副本,无法正常消费和生产,部分迁移则可以正常消费和生产。

查询topic的详尽信息:

# bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic testpanjunbai
Topic:testpanjunbai	PartitionCount:2	ReplicationFactor:2	Configs:
	Topic: testpanjunbai	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 2,1
	Topic: testpanjunbai	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 2,1

说明这个topic 有2个分区,2个副本,在broker 1,2 上面。

生产一个要迁移的topic文件,使用json格式:

# cat  topics-to-move.json 
{
  "topics": [
    {"topic": "testpanjunbai"}
  ],
  "version":1
}

使用命令生产一个kafka执行的迁移表,吧topic迁移到1,2,3,4,5这些broker上面

# /root/kafka_2.12-0.11.0.3/bin/kafka-reassign-partitions.sh --zookeeper 192.168.226.100:2183 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3,4,5" -generate
# cat expand-cluster-reassignment.json 

{"version":1,
  "partitions":[{"topic":"testpanjunbai","partition":0,"replicas":[1,3]},
 		{"topic":"testpanjunbai","partition":1,"replicas":[2,3]}]
}



生产的json文件详细描述了迁移后的topic分区和副本存在的broker节点,也可以在这里手动进行调整。调整如下:

# cat expand-cluster-reassignment.json

{"version":1,
  "partitions":[{"topic":"testpanjunbai","partition":0,"replicas":[3,4]},
 		{"topic":"testpanjunbai","partition":1,"replicas":[4,5]}]
}

使用命令执行迁移:

# /root/kafka_2.12-0.11.0.3/bin/kafka-reassign-partitions.sh --zookeeper 192.168.226.100:2183 --reassignment-json-file expand-cluster-reassignment.json -execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"testpanjunbai","partition":1,"replicas":[1,2]},{"topic":"testpanjunbai","partition":0,"replicas":[2,1]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

检查是否完成迁移:

# /root/kafka_2.12-0.11.0.3/bin/kafka-reassign-partitions.sh --zookeeper 192.168.226.100:2183 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment: 
Reassignment of partition [testpanjunbai,0] completed successfully
Reassignment of partition [testpanjunbai,1] completed successfully

查看topic状态:

# /root/kafka_2.12-0.11.0.3/bin/kafka-topics.sh --zookeeper 192.168.226.42:2181 --describe --topic testpanjunbiai 
Topic:testpanjunbai	PartitionCount:2	ReplicationFactor:2	Configs:
	Topic: testpanjunbai	Partition: 0	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: testpanjunbai	Partition: 1	Leader: 4	Replicas: 4,5	Isr: 4,5

当然也可以使用kafka-manager工具进行远程迁移操作。
当一个节点停止后,在重启,他将不会是分区的leader,会被其他节点的副本取代,但是在集群中他仍是Preferred Leader,需要使用命令来让他在成为节点。

# /root/kafka_2.12-0.11.0.3/bin/kafka-topics.sh --zookeeper 192.168.226.42:2181 --describe --topic testpanjunbiai 
Topic:testpanjunbai	PartitionCount:2	ReplicationFactor:2	Configs:
	Topic: testpanjunbai	Partition: 0	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: testpanjunbai	Partition: 1	Leader: 5	Replicas: 4,5	Isr: 4,5
# /root/kafka_2.12-0.11.0.3/bin/kafka-preferred-replica-election.sh --zookeeper 192.168.226.42:2181 
Created preferred replica election path with {"version":1,"partitions":[{"topic":"__consumer_offsets","partition":34},{"topic":"__consumer_offsets","partition":36},{"topic":"testpanjunbai","partition":1},{"topic":"__consumer_offsets","partition":27},{"topic":"__consumer_offsets","partition":1},{"topic":"__consumer_offsets","partition":20},{"topic":"__consumer_offsets","partition":7},{"topic":"__consumer_offsets","partition":42},{"topic":"__consumer_offsets","partition":49},{"topic":"__consumer_offsets","partition":4},{"topic":"__consumer_offsets","partition":33},{"topic":"__consumer_offsets","partition":14},{"topic":"__consumer_offsets","partition":46},{"topic":"testpanjunbai","partition":0},{"topic":"__consumer_offsets","partition":24},{"topic":"__consumer_offsets","partition":28},{"topic":"__consumer_offsets","partition":6},{"topic":"__consumer_offsets","partition":37},{"topic":"__consumer_offsets","partition":43},{"topic":"__consumer_offsets","partition":21},{"topic":"__consumer_offsets","partition":15},{"topic":"__consumer_offsets","partition":11},{"topic":"__consumer_offsets","partition":30},{"topic":"__consumer_offsets","partition":2},{"topic":"test123","partition":1},{"topic":"__consumer_offsets","partition":47},{"topic":"__consumer_offsets","partition":25},{"topic":"__consumer_offsets","partition":29},{"topic":"__consumer_offsets","partition":8},{"topic":"__consumer_offsets","partition":23},{"topic":"__consumer_offsets","partition":40},{"topic":"__consumer_offsets","partition":31},{"topic":"__consumer_offsets","partition":19},{"topic":"__consumer_offsets","partition":16},{"topic":"__consumer_offsets","partition":38},{"topic":"test123","partition":0},{"topic":"__consumer_offsets","partition":44},{"topic":"__consumer_offsets","partition":10},{"topic":"__consumer_offsets","partition":3},{"topic":"__consumer_offsets","partition":35},{"topic":"__consumer_offsets","partition":26},{"topic":"__consumer_offsets","partition":39},{"topic":"__consumer_offsets","partition":13},{"topic":"__consumer_offsets","partition":17},{"topic":"__consumer_offsets","partition":22},{"topic":"__consumer_offsets","partition":9},{"topic":"__consumer_offsets","partition":0},{"topic":"__consumer_offsets","partition":41},{"topic":"__consumer_offsets","partition":48},{"topic":"__consumer_offsets","partition":18},{"topic":"__consumer_offsets","partition":32},{"topic":"__consumer_offsets","partition":12},{"topic":"__consumer_offsets","partition":45},{"topic":"__consumer_offsets","partition":5}]}
Successfully started preferred replica election for partitions Set([__consumer_offsets,32], [__consumer_offsets,16], [__consumer_offsets,49], [__consumer_offsets,44], [__consumer_offsets,28], [test123,0], [__consumer_offsets,17], [__consumer_offsets,23], [__consumer_offsets,7], [testpanjunbai,0], [__consumer_offsets,4], [__consumer_offsets,29], [__consumer_offsets,35], [__consumer_offsets,3], [__consumer_offsets,24], [__consumer_offsets,41], [__consumer_offsets,0], [__consumer_offsets,38], [__consumer_offsets,13], [__consumer_offsets,8], [__consumer_offsets,5], [__consumer_offsets,39], [__consumer_offsets,36], [__consumer_offsets,40], [__consumer_offsets,45], [__consumer_offsets,15], [__consumer_offsets,33], [__consumer_offsets,37], [__consumer_offsets,21], [testpanjunbai,1], [__consumer_offsets,6], [test123,1], [__consumer_offsets,11], [__consumer_offsets,20], [__consumer_offsets,47], [__consumer_offsets,2], [__consumer_offsets,27], [__consumer_offsets,34], [__consumer_offsets,9], [__consumer_offsets,22], [__consumer_offsets,42], [__consumer_offsets,14], [__consumer_offsets,25], [__consumer_offsets,10], [__consumer_offsets,48], [__consumer_offsets,31], [__consumer_offsets,18], [__consumer_offsets,19], [__consumer_offsets,12], [__consumer_offsets,46], [__consumer_offsets,43], [__consumer_offsets,1], [__consumer_offsets,26], [__consumer_offsets,30])

再次查看topic状态:

# /root/kafka_2.12-0.11.0.3/bin/kafka-topics.sh --zookeeper 192.168.226.42:2181 --describe --topic testpanjunbaiTopic:testpanjunbai	PartitionCount:2	ReplicationFactor:2	Configs:
	Topic: testpanjunbai	Partition: 0	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: testpanjunbai	Partition: 1	Leader: 4	Replicas: 4,5	Isr: 5,4

修改分区的 Preferred Leader ,同上面迁移过程一样,只需要调整expand-cluster-reassignment.json中分区的副本顺序,在执行即可,执行完毕后会发现新指定的 Preferred Leader 变成leader了。

参考文档:

https://blog.csdn.net/u013214151/article/details/53495045
https://blog.csdn.net/u013573133/article/details/48142677
http://orchome.com/
http://kafka.apache.org/quickstart
https://tech.meituan.com/kafka-fs-design-theory.html
http://1028826685.iteye.com/blog/2326601
https://www.iteblog.com/archives/1611.html
http://blog.csdn.net/huanggang028/article/details/47830529
https://www.w3cschool.cn/apache_kafka/apache_kafka_introduction.html
http://www.jasongj.com/2015/04/24/KafkaColumn2/
http://blog.csdn.net/qiaochao911/article/details/40920645
http://abloz.com/tech/2017/07/15/kafka-error-test/
http://blog.csdn.net/lizhitao/article/details/25667831

原文地址:https://www.cnblogs.com/panjunbai/p/7651512.html