Kafka学习入门

最近工作中用到了两个很给力的项目,一个是Kafka,一个是Strom。本着自我学习并方便他人的目的,我会将我觉得比较有用的英文文档翻译在此(保留系统专有名词不作翻译)。

1kafka介绍

在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。
  Apache Kafka是一个开源消息系统,由Scala写成。
  Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。
  Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群由多个kafka实例组成,每个实例(server)称为broker。
  题外话:无论是kafka集群,还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。

Kafka是一个分布式的(distributed),可划分的(partitioned),冗余备份的持久性的日志服务(replicated commit log service). 提供消息系统(messaging system)的功能,拥有独一无二的设计。

这是什么意思呢?

首先,让我们看一下“消息(messaging)”的基本术语:

  • topics: (翻译过来叫话题)特指Kafka处理的消息源(feeds of messages)的不同分类。 【注:feeds翻译为源,饲料,在计算机领域,特指一种用户从数据源接收消息的机制(见Wikipedia),比如RSS feeds等。这里指接收messages.】

  • producers:向Kafka的一个topic发布消息的过程叫做producers。就是向kafka broker发消息的客户端。

  • consumers:订阅topics并处理其发布的消息的过程叫做consumers。就是向kafka broker取消息的客户端。

  • broker:Kafa集群中的一台或多台服务器统称为broker

所以,从总体来说,produces通过网络向Kafka的集群发送消息并转由consumers来处理。这个过程如下图所示:

alt

客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。虽然我们为Kafka提供了Java版本的客户端,但是客户端其实可以使用多种语言

2.Topics与日志

让我们首先来研究一下Kafka提供的这个抽象的概念 – topic

一个topic是指发布的消息的一个类别或消息源的名字。Kafka集群将为每一个topic划分一个日志(partitioned log)如下图所示:

alt

每一个partition是一个排好序的,不可变的消息序列。新的消息不断的追加到序列的尾部。–即持久性日志(commit log)。每一个分区(partition)中的消息,有一个名叫offset 的顺序编号,作为这条消息在partition中的标识。

Kafka集群将在设定的时间范围内,保存所有被发布的消息,不论该消息是否被处理完成。例如,如果一个日志被设置为保存2天,那么在它发布的两天之内,它都是可以被处理的,而在2天之后,它就会被系统销毁并释放掉。

Kafka的性能与数据的大小之间是常数的关系,所以保存大量的数据是没有问题的。

实际上,每个consumer中仅有的元数据(metadata)的主要部分是该consumer在日志中的位置信息,叫做offset(偏移量)。这个offsetconsumer控制,在一般情况下,consumer按照offset的顺序读取消息,但事实上consumer可以控制位置,可以以任何想要的顺序处理消息。例如一个consumer可以重置并重新处理一个已经处理过的offset

这些特点的组合使Kafka的consumers变得特别的廉价–它们能来去自如而不会对集群或者其它的consumers造成多大影响。比如,你可以使用我们的命令行工具来“tail”任意topic中的内容,而不会改变任何被已有consumers处理过的内容。

日志服务的分区有几个目的。首先,它允许日志扩展到超过单台服务器允许的大小。因为虽然每一个单独的分区必须适应承载它们的服务器,但是一个topic可以包含多个分区,所以能处理任意大小的数据。其次,它们作为并行单元–一会儿我们会了解更多。

3.分布式

日志的分区partitions分布式地部署在Kafka服务器集群上,每个服务器为一个共享的分区处理数据和请求。每一个分区的数据被冗余的备份在多台服务器上用于容错,可以通过配置来设定用于备份的服务器的数量。

每个分区有一台服务器扮演“领导”的角色,有0或多台服务器扮演“随从”。领导负责所有对分区的读写请求,同时随从们被作为领导的备份。一旦领导挂掉,随从中的一个会自动变成新的领导。每一台服务器都同时在一些分区里扮演领导而在另外一些分区中充当随从,这让集群拥有良好的负载均衡。

4.Kafka消息有序性

生产者是一个独立的集群,和kafka的broker集群,消费者集群没有太直接的干系。比如flume就可以作为生产者,内部调用kafka的客户端代码,确保把收集的数据发到kafka集群中。
  如何保证kafka全局消息有序?
  比如,有100条有序数据,生产者发送到kafka集群,kafka的分片有4个,可能的情况就是一个分片保存0-25,一个保存25-50......这样消息在kafka中存储是局部有序了。严格说,kafka是无法保证全局消息有序的,没有这个机制,只能局部有序。
  但是如果只有一个分片和一个消息的生产者,那么就相当于消息全局有序了。如果有多个消息生产者,就算只有一个分片,如果这些生产者的消息都发给这个分片,那kafka中的消息连局部有序都没有办法了。

5.Producers (生产者)

producers向他们选择的topics发布数据。每个producer负责在topic中选择将哪些消息分配给哪些分区。最简单的方式从分区列表中轮流选择。也可以根据某种算法依照权重选择分区例如通过简单的“循环赛”的方式来或是根据一些语义划分的方法(比如根据一些消息中的键)来实现负载均衡。开发者负责如何选择分区的算法。

kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息;当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接
消息由producer直接通过socket发送到broker,中间不会经过任何"路由层",事实上,消息被路由到哪个partition上由producer客户端决定;比如可以采用"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的。
在producer端的配置文件中,开发者可以指定partition路由的方式。
Producer消息发送的应答机制设置发送数据是否需要服务端的反馈,有三个值0,1,-1
0:  producer不会等待broker发送ack
1:  当leader接收到消息之后发送ack
-1: 当所有的follower都同步消息成功后发送ack
request.required.acks=0

6.Consumers (消费者)

传统的消息系统有两种模式:消息队列发布/订阅。在消息队列模式中,一池子的consumers可能从一个服务器上读取数据,每个消息被分发给其中一个consumer;而在发布订阅模式中,消息被以广播的方式发给所有的consumers。Kafka通过提供了一个叫做消费者群组consumer group)的抽象概念涵盖以上两种模式。

Consumers上标记有他们所属的consumer group的名字,每个被发布到topic上的消息消息会被分发到所有订阅该topicconsumer group内部的一个consumer实例上。Consumer实例可以是一个单独的进程也可以是一个单独的机器。

Consumer Group(CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

如果所有的consumer实例都具有相同的群组,那么就像传统的队列模式一样平衡着各个consumer的负载。

如果所有的consumer实例均有不同的群组,那么这就如同发布/订阅模式,所有的消息被广播给所有的消费者。

每个group中可以有多个consumer,每个consumer属于一个consumer group;通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高"故障容错"性,如果group中的某个consumer失效那么其消费的partitions将会有其他consumer自动接管。
对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer;那么一个group中所有的consumer将会交错的消费整个Topic,每个group中consumer消息消费互相独立,我们可以认为一个group是一个"订阅"者。
在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息

更常见的情况是,我们发现topics一般只有很少的消费者群组,一个群组一般对应一个“逻辑订阅”单元。而每一个群组由大量的consumer实例构成,用来提供可扩展性和容错性。这其实就是发布/订阅模式的一种特殊情况,只不过订阅者是一个consumers的集群而非一个单独的进程而已。

同时,Kafka具有比传统消息系统更强大的顺序保障。

传统的队列在服务器上按照一定的顺序存储消息,然后当多个consumers从队列中处理消息时,系统按照消息存储的顺序分发消息。然而,虽然系统是按照顺序送出消息的,但是是按异步的方式送达到consumer手中,所以当消息到达不同consumer手中的时候,已经没有顺序可言了。这意味着消息的顺序在并行处理中不复存在了。消息系统常常有一个权宜之计来应对这种情况,就是使用了一个叫做“独家消费”的概念,就是只允许一个进程处理队列,不过这么做的话,并行处理当然也就不复存在了。

Kafka在这一点上做的比较好。通过一个叫做“排比parallelismparallelism”的概念–即并行–在topics中, Kafka能够同时提供顺序保证和一池子消费进程间的负载均衡。

Kafka只能保证每个分区内部的消息的总体顺序,而保证同一个topic在不同分区中消息的顺序。这种每个分区有序并可以按照数据的键去分区的特性对于大多数应用都已经足够。但是,如果你需要保证所有消息的总体顺序,可以通过使用只有一个分区的topic去完成,不过这样做就意味着只有一个consumer进程了。

7.保障

在高层次上Kafka提供如下保障:

  • 由producer发送给特点topic分区的消息按照发送的先后顺序排序。也就是说,如果同一个producer发送了消息M1和M2,M1先被发送,那么M1的offset比M2的小,且M1先出现在日志中。
  • 一个consumer实例按照消息在日志中存储的顺序收到消息。
  • 对于一个有N个备份的topic,我们允许其中N-1个服务器挂掉,依然能保证不丢失任何持久性日志中的消息。

8.用例

Kafka可以用于:

  • 消息系统, 例如ActiveMQ 和 RabbitMQ.
  • 站点的用户活动追踪。 用来记录用户的页面浏览,搜索,点击等。
  • 操作审计。 用户/管理员的网站操作的监控。
  • 日志聚合。收集数据,集中处理。
  • 流处理。
  • [Event sourcing] (http://martinfowler.com/eaaDev/EventSourcing.html)
  • Commit Log

讲了Kafka的背景知识这么多,我们还是快点开始实践之旅吧。
假定你还没有任何的Kafka和Zookeeper环境。

8.1: 下载代码

下载 0.8.1 版本并解压。 (当前最新的稳定版本是0.8.1.1)

1
2
> tar -xzf kafka_2.9.2-0.8.1.1.tgz
> cd kafka_2.9.2-0.8.1.1

8.2: 启动服务

Kafka使用Zookeeper所以你可能先要安装一个ZooKeeper.你可以使用kafka中打包好的脚本或者一个配置好的Zookeeper.

1
2
3
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

现在可以启动Kafka了:

1
2
3
4
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

8.3: 新建一个话题Topic

Topic的名字叫"test",只有一个分区和一个备份。

1
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看存在的Topic列表:

1
2
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

除了手工创建Topic,你也可以配置你的broker当发布一个不存在的topic时自动创建topic。

8.4: 发送消息

Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。

1
2
3
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

8.5: 消费消息

Kafka也提供了一个消费消息的命令行工具。

1
2
3
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

这些命令行工具有很多的选项,你可以查看他们的文档来了解更多的功能。

8.6: 设置多个broker

目前我们运行在一个broker,不好玩。
让我们来点大的。

首先为每个broker创建一个配置文件。

1
2
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

修改文件如下:

1
2
3
4
5
6
7
8
9
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

broker.id属性别重样。为了在一台机器上启动两个broker,改了一下它们的port的。
Zookeeper还在,上面用的broker还活着。 来启动这两个broker.

1
2
3
4
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

创建一个topic试试, 奢侈一把,把备份设置为3:

1
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

成了。运行 "describe topics" 命令瞧瞧:

1
2
3
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

第一行给出了分区的汇总信息。每个分区行给出分区信息。

"leader" 节点是1.
"replicas" 信息,在节点1,2,0上,不管node死活,只是列出信息而已.
"isr" 工作中的复制节点的集合. 也就是活的节点的集合.

来看看一开始创建的节点:

1
2
3
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

毫无新意,想必你已经明了了。

发布个消息:

1
2
3
4
5
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

消费它:

1
2
3
4
5
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

测试一下容错. 干掉leader,也就是Broker 1:

1
2
3
> ps | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
> kill -9 7564

Leader被切换到一个follower上节, 点 1 不会被列在isr中了,因为它死了:

1
2
3
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

但是,消息没丢啊,不信你试试:

1
2
3
4
5
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

8.7生产者的例子

查看这里

8.8消费者的例子

查看这里

 参考:

https://www.cnblogs.com/intsmaze/p/6386616.html

http://www.yclod.com/kafka-jie-shao/

https://colobu.com/2014/08/06/kafka-quickstart/

 
原文地址:https://www.cnblogs.com/fnlingnzb-learner/p/10491060.html