kafka-环境搭建与简单入门

Kafka环境搭建与简单入门
步骤一:下载代码
下载Kafka安装包,并解压。
这里下载的版本为kafka_2.11-1.0.0.tgz。

> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0

步骤二:启动服务
Kafka使用Zookeeper存储元数据,如果你还没有Zookeeper服务器,你需要先启动一个Zookeeper服务器。

可以使用与Kafka打包在一起的便捷脚本来快速简单地创建一个单节点的Zookeeper实例

> bin/zookeeper-server-start.sh config/zookeeper.properties

启动Zookeeper服务后,现在启动Kafka服务。

> bin/kafka-server-start.sh config/server.properties

注:如果要使用后台启动,可以使用:

> bin/kafka-server-start.sh -daemon config/server.properties

步骤三:创建Topic
下面我们创建一个名为"test"的Topic,它只有一个分区和一个副本:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

现在我们可以使用–list参数来查看这个Topic:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

或者,您也可以将Kafka配置为:在读取或写入消息时,如果Topic不存在,自动创建Topic,而不是手动创建。
步骤四:发送一些消息
Kafka自带一个命令行客户端,可以从文件或标准输入中获取输入,并将其作为message发送到Kafka集群。

默认情况下,每行将作为单独的message发送。

运行Producer,然后在控制台输入一些消息已发送到服务端。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

步骤五:启动一个Consumer
Kafka还提供各类命令行的Consumer,将Topic中的消息输出到控制台。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

建议您使用两个不同的终端分别运行Producer和Consumer程序,这样您在Producer端输入的消息,会立即在Consumer端显示出来。

所有的命令行工具都有其他选项,运行不带任何参数的命令将显示更加信息的使用信息。

步骤六:加入更多节点
到目前为止,我们的Kafka集群只有一个节点,为了深入了解它,我们把集群扩展到三个节点(仍然在本地机器上,只不过是启动更多的实例而已)。

首先,为Broker实例创建配置文件:

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

然后编辑这些文件并设置如下属性:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

broker.id属性是集群中每个节点的编号,必须保证唯一。此外,我们必须指定其他实例的端口和日志目录,因为我们在同一台机器上运行,否则会出现端口冲突或数据覆盖的问题。

我们前面已经启动了一个实例,现在将两个新的实例进行启动。

> bin/kafka-server-start.sh -daemon config/server-1.properties
> bin/kafka-server-start.sh -daemon config/server-2.properties

现在创建一个副本为3的新Topic:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

现在我们查看这个Topic的详情信息:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

以下是对输出信息的解释。第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。

  • "leader"是负责给定分区所有读写操作的节点。每个节点都是部分分区的领导者。
  • "replicas"是复制分区日志的节点列表,不管这些是leader还是还是其他节点。
  • "isr"是一组"同步"的replicas,是replicas列表的子集,是指该分区对应的正在同步中的节点。

注意,在示例中,节点1是该Topic中唯一的分区leader。

我们可以查看最开始创建的名为test的Topic详情:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

和预想中的一样,原来的Topic只有1个副本并且在编号为0的节点上,因为我们创建该Topic时,集群中当时只有一个节点。

现在先新创建的Topic中发送一些信息:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

消费该Topic中的消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

让我们来测试一下容错性。Broker 1是my-replicated-topic的Leader节点,让我们来结束它:

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

再次查看my-replicated-topic的详情,发现leader自动切换为另一个节点,并且节点1也不在ISR列表中。

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0
原文地址:https://www.cnblogs.com/shangwei/p/13404026.html