kafka入门(1)- 基本概念

Kafka is a distributed,partitioned,replicated commit logservice

  Kafka提供了类似于JMS的特性,但是在设计实现上完全不同,并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker,无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。

核心API

Topics And Logs

  一个Topic可以认为是一类消息,每个topic将被分成多个partition(如下图),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”,每个消息可以有多个订阅者。

  kafka和JMS(Java Message Service)实现(activeMQ)不同的是:即使消息被消费,消息仍然不会被立即删除。日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如设置log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费。kafka通过这种简单的手段,来释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘IO开支

  对于consumer而言,它需要保存消费消息的offset(如下图),对于offset的保存和使用,由consumer来控制;当consumer正常消费消息时,offset将会"线性"的向前驱动,即消息将依次顺序被消费。事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值,offset将会保存在zookeeper中。

  partitions的设计目的有多个,最根本原因是kafka基于文件存储,通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions,来提高消息保存/消费的效率。此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力。

Distribution
  一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作。此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性。
  基于replicated方案,那么就意味着需要对多个备份进行调度。每个partition都有一个server为"leader",leader负责所有的读写操作。如果leader失效,那么将会有其他follower来接管(成为新的leader)。follower只是单调的和leader跟进,同步消息即可。由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个"leader",一台服务器可能同时是一个分区的leader,另一个分区的follower,这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理,确保集群的性能。
Producer
  生产者往某个Topic上发布消息。生产者也负责选择发布到Topic上的哪一个分区。最简单的方式从分区列表中轮流选择。也可以根据某种算法依照权重选择分区。开发者负责如何选择分区的算法。
Consumer
  通常来讲,消息模型可以分为两种, 队列和发布-订阅式。 队列的处理方式是 一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。在发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了queue模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。 更通用的, 我们可以创建一些消费者组作为逻辑上的订阅者。每个组包含数目不等的消费者, 一个组内多个消费者可以用来扩展性能和容错。正如下图所示:

screenshot

2个kafka集群托管4个分区(P0-P3),2个消费者组,消费组A有2个消费者实例,消费组B有4个。本质上kafka只支持Topic.每个consumer属于一个consumer group。反过来说,每个group中可以有多个consumer,发送到Topic的消息只会被订阅此Topic的每个group中的一个consumer消费。

  正像传统的消息系统一样,Kafka保证消息的顺序不变。 再详细扯几句。传统的队列模型保持消息,并且保证它们的先后顺序不变。但是, 尽管服务器保证了消息的顺序,消息还是异步的发送给各个消费者,消费者收到消息的先后顺序不能保证了。这也意味着并行消费将不能保证消息的先后顺序。用过传统的消息系统的同学肯定清楚,消息的顺序处理很让人头痛。如果只让一个消费者处理消息,又违背了并行处理的初衷。 在这一点上Kafka做的更好,尽管并没有完全解决上述问题。 Kafka采用了一种分而治之的策略:分区。 因为Topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。

  如果所有的consumer都具有相同的group,这种情况和queue模式很像,消息将会在consumers之间负载均衡。

  如果所有的consumer都具有不同的group,那这就是"发布-订阅",消息将会广播给所有的消费者。

  在kafka中,一个partition中的消息只会被group中的一个consumer消费。每个group中consumer消息消费互相独立,我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费。不过一个consumer可以消费多个partitions中的消息。kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的。事实上,从Topic角度来说,消息仍不是有序的。

  kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。

使用场景
 #Messaging
  对于一些常规的消息系统,kafka是个不错的选择。partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势。不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性。kafka只能使用作为"常规"的消息系统。在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)。
#Websit activity tracking
  kafka可以作为"网站活性跟踪"的最佳工具,可以将网页/用户操作等信息发送到kafka中,并实时监控,或者离线统计分析等。
#Log Aggregation
  kafka的特性决定它非常适合作为"日志收集中心"。application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中。kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支。此时consumer端可以使hadoop等其他系统化的存储和分析系统。
设计原理
#持久性
  kafka使用文件存储消息,这就直接决定kafka在性能上严重依赖文件系统的本身特性。且无论任何OS下,对文件系统本身的优化几乎没有可能。文件缓存/直接内存映射等是常用的手段。因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的。同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。
#性能
  需要考虑的影响性能点很多,除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题。kafka并没有提供太多高超的技巧,对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker。对于consumer端也是一样,批量fetch多条消息。不过消息量的大小可以通过配置文件来指定。对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能,将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换。其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略。压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑,可以将任何在网络上传输的消息都经过压缩,kafka支持gzip/snappy等多种压缩方式。
#生产者
  负载均衡, producer将会和Topic下所有partition leader保持socket连接。消息由producer直接通过socket发送到broker,中间不会经过任何"路由层"。事实上,消息被路由到哪个partition上,有producer客户端决定。比如可以采用"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的。
  其中partition leader的位置(host:port)注册在zookeeper中,producer作为zookeeper client,已经注册了watch用来监听partition leader的变更事件。异步发送,将多条消息暂且在客户端buffer起来,并将他们批量的发送到broker,小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。不过这也有一定的隐患,比如说当producer失效时,那些尚未发送的消息将会丢失。
#消费者
  consumer端向broker发送"fetch"请求,并告知其获取消息的offset,此后consumer将会获得一定条数的消息,consumer端也可以重置offset来重新消费消息。
  在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端。不过在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息
  这中模式有些优点,首先consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset)。此外,消费者可以良好的控制消息消费的数量,batch fetch
#轻量级
  其他JMS实现,消息消费的位置是由prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态,这就要求JMS broker需要太多额外的工作。而在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的。当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset,由此可见,consumer客户端也很轻量级。
#消息传送机制
  对于JMS实现,消息传输担保非常直接:有且只有一次(exactly once)。在kafka中稍有不同:
  1) at most once: 最多一次。这个和JMS中"非持久化"消息类似,发送一次,无论成败,将不会重发。消费者fetch消息,然后保存offset,然后处理消息。当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理,那么此后"未处理"的消息将不能被fetch到,这就是"at most once"。
  2) at least once: 消息至少发送一次。如果消息未能接受成功,可能会重发,直到接收成功。 消费者fetch消息,然后处理消息,然后保存offset。如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once",原因offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态。
  3) exactly once:消息只会发送一次。 kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的。
  通常情况下"at-least-once"是我们首选。(相比at most once而言,重复接收数据总比丢失数据要好).
#复制备份
  kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有)。备份的个数可以通过broker配置文件来设定。leader处理所有的read-write请求,follower需要和leader保持同步。Follower和consumer一样,消费消息并保存在本地日志中。leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除。当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它。即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可(不同于其他分布式存储,比如hbase需要"多数派"存活才行)。
  当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个"up-to-date"的follower。选择follower时需要兼顾一个问题,就是新leaderserver上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力,在选举新leader,需要考虑到"负载均衡"。
#日志
    如果一个topic的名称为"my_topic",它有2个partitions,那么日志将会保存在my_topic_0和my_topic_1两个目录中。日志文件中保存了一序列"log entries"(日志条目),每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容"。每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置。每个partition在物理存储层面,有多个log file组成(称为segment),segmentfile的命名为"最小offset".kafka,例如"00000000000.kafka"。其中"最小offset"表示此segment中起始消息的offset。
 
    其中每个partiton中所持有的segments列表信息会存储在zookeeper中。
    当segment文件尺寸达到一定阀值时(可以通过配置文件设定,默认1G),将会创建一个新的文件。当buffer中消息的条数达到阀值时将会触发日志信息flush到日志文件中,同时如果"距离最近一次flush的时间差"达到阀值时,也会触发flush到日志文件。如果broker失效,极有可能会丢失那些尚未flush到文件的消息。因为server意外出现,仍然会导致log文件格式的破坏(文件尾部),那么就要求当server启东是需要检测最后一个segment的文件结构是否合法并进行必要的修复。
    获取消息时,需要指定offset和最大chunk尺寸。offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数)。根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可。
    日志文件的删除策略非常简单:启动一个后台线程定期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的创建时间)。为了避免删除文件时仍然有read操作(consumer消费),采取copy-on-write方式。
原文地址:https://www.cnblogs.com/ijavanese/p/9238377.html