kafka

1. 基本介绍

  Kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态

  kafka的集群由多个Broker服务器组成,每个类型的消息被定义为topic同一topic内部的消息按照一定的key和算法被分区(partition)存储在不同的Broker,消息生产者 producer 和消费者consumer可以在多个Broker上生产/消费topic

2. 核心思想

  (1)消息队列是以log文件的形式存储,消息生产者只能将消息添加到既有的文件尾部,没有任何ID信息用于消息的定位,完全依靠文件内的位移,因此消息的使用者只能依靠文件位移顺序读取消息,这样也就不需要维护复杂的支持随即读取的索引结构。

  (2)kafka broker完全不维护和协调多用户使用消息的行为模式,用户自己维护位移用来索引消息

  (3)最小的并发访问单位就是partition分区,同一用户组内的所有用户(可以理解为同一个应用的所有并发进程)只能有一个访问同一分区,同时分区的个数是固定的,不支持动态调整。这样最大简化了多进程/分布式client之间对消息处理访问的并发控制的复杂度,当然也带来一定的使用模式上的限制(比如最大并发度完全取决于预先规划的partition的个数),此外分区也带来一个问题就是消息只是分区内部有序而不是全局有序的。如果需要全局有序,应用需要自己靠别的机制来保证

  (4)使用主动拉(Pull)的模式派发消息,消息的使用情况,比如是否还有consumer没有读取,是否重复读取(改进中)等,在Broker端也完全不跟踪维护,消息的过期处理简单的由定时器定时删除(比如保留7天),由此简化各种消息跟踪维护的开销。

  (5)0.8版本中,添加了数据replica的机制,一个消息分区的多个replica分布在不同的Broker上,由leader replica负责日常读写通过zookeeper监督follower,不同的分区的leader replica均衡负载到不同的Broker上。在这种情况下,producer可以选择不等待leader replicaAck,部分Ack,或者完全备份完毕后Ack等不同的ack机制。这三种机制,性能依次递减 (producer吞吐量降低1-3),数据健壮性则依次递增。

   (6) 采取各种方式最大化数据传输效率,比如生产者和消费者可以批量读写消息减少RPC开销,使用Zero Copy方式在内核层直接将文件内容传送给网络Socket,避免应用层数据拷贝

  (7)激进的内存管理模式,基本的意思就是不管理。kafka不在JVM进程内部维护消息Cache,消息直接从文件中读写,完全依赖操作系统在文件系统层面的cache,避免在JVM中管理Cache带来的额外数据结构开销和GC带来的性能代价。基于批量处理和顺序读写的应用模式,最大化利用文件系统的Cache机制和规避文件读写相对内存读写的性能代价。

3. 存储策略

  (1) kafka以topic来进行消息管理,每个topic包含多个partition,每个 partition 对应一个逻辑 log,由多个segment组成
  (2) 每个segment中存储多条消息,如下图所示,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
    (3) 每个partition 在内存中对应一个index,记录每个segment中的第一条消息偏移
  (4) 发布者发到某个topic的消息会被均匀的分布到多个partition上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应part的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment

4. 消息发布和订阅

   (1) 发布消息时,kafka client先构造一条消息,将消息加入到消息集set中( kafka支持批量发布,可以往消息集合中添加多条消息,一次行发布),send消息时,client需指定消息所属的topic。

   (2)订阅消息时,kafka client需指定topic以及partition num(每个partition对应一个逻辑日志流,如topic代表某个产品线,partition代表产品线的日志按天切分的结果)client订阅后,就可迭代读取消息,如果没有消息,client会阻塞直到有新的消息发布

     (3)consumer可以累积确认接收到的消息,当其确认了某个offset的消息,意味着之前的消息也都已成功接收到,此时broker会更新zookeeper上地offset registry

   (4)一个topic可以被多个 Consumer group 分别消费 ,但是每个Consumer group中只能有一个Consumer消费此消息,一个group内的consumer只能消费不同的partition,即一个partition只能被一个consumer消费

5. Zookeeper 协调控制

  (1) 管理broker与consumer的动态加入与离开,每个broker启动后,会在zookeeper上注册一个临时的节点(broker registry):包含broker的ip地址和端口号,所存储的topics和partitions信息。每个consumer启动后会在zookeeper上注册一个临时的节点(consumer registry):包含consumer所属的consumer group以及订阅的topics。

  (2) 触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一consumer group内的多个consumer的订阅负载平衡。

  (3) 维护消费关系及每个partion的消费信息。每个consumer group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。

  总结:

  kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)

  (1)Broker node registry::当一个kafka broker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,znode也会被删除。

格式:/broker/ids/[0...N] -->host:port,其中[0..N]表示broker id,每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),znode的值为此broker的host:port信息。

  (2) Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.

格式:/broker/topics/[topic]/[0...N],其中[0..N]表示partition索引号.

  (3) Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡"。一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,partition相对均衡的分散到每个consumer上。

  (4) Consumer id Registry:每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息,

格式: /consumers/[group_id]/ids/[consumer_id],仍然是一个临时的znode,此节点的值为{"topic_name":#streams...},即表示此consumer目前所消费的topic + partitions列表.

  (5) Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset。

格式: /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value,znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.

  (6) Partition Owner registry: 用来标记partition被哪个consumer消费,临时znode

格式: /consumers/[group_id]/owners/[topic]/[broker_id-partition_id] -->consumer_node_id

  当consumer启动时,所触发的操作:

    A) 首先进行"Consumer id Registry";

    B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).

    C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.

  总而言之:

    (1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.

    (2) Broker端使用zookeeper用来注册broker信息,以及监测partition leader存活性.

    (3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.

6.生产者和消费者

  (1) 生产者  

  负载均衡: producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".事实上,消息被路由到哪个partition上,producer客户端决定.比如可以采用"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的.其中partition leader的位置(host:port)注册在zookeeper中,producer作为zookeeper client,已经注册了watch用来监听partition leader的变更事件.

  异步发送:将多条消息暂且在客户端buffer起来,并将他们批量的发送到broker,小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。不过这也有一定的隐患,比如说当producer失效时,那些尚未发送的消息将会丢失。

  (2)消费者

  consumer端向broker发送"fetch"请求,并告知其获取消息的offset;此后consumer将会获得一定条数的消息;consumer端也可以重置offset来重新消费消息.JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.不过在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息;这中模式有些优点,首先consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以良好的控制消息消费的数量,batch fetch.

  其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级.

 7. kafka消息传送机制的单重情况

  1) at most once: 最多一次(保存offset到zookeeper成功,消息处理失败,这条消息将不能被fetch到.

  2) at least once: 消息至少发送一次(消息处理成功了,保存offset到zookeeper失败了,下次会重新fetch这条消息).

  3) exactly once: 消息只会发送一次(offset和处理数据都成功了).

  at most once: 消费者fetch(拿)消息,保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理.那么此后"未处理"的消息将不能被fetch到,这就是"at most once".

  at least once: 消费者fetch消息,处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once",原因offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态.

  exactly once: kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的.

  通常情况下"at-least-once"是我们搜选.(相比at most once而言,重复接收数据总比丢失数据要好).

8. 复制备份

  备份分主从,leader跟踪floewr状态,太差就从列表中干掉,保证一个好用就行。所以producer保存数据时,leader和flower都成功才算成功。leader死了再选一个数据最全的,不过也要考虑这个broker上是不是太多leader了。

  kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定.leader处理所有的read-write请求,follower需要和leader保持同步.Follower和consumer一样,消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.(不同于其他分布式存储,比如hbase需要"多数派"存活才行)

  当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个"up-to-date"的follower.选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡".

9. 日志文件格式

  如果一个topic的名称为"my_topic",它有2个partitions,那么日志将会保存在my_topic_0和my_topic_1两个目录中;日志文件中保存了一系列"log entries"(日志条目),每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置。每个partition在物理存储层面,由多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

注意:

  (1)其中每个partiton中所持有的segments列表信息会存储在zookeeper中.

  (2)当segment文件尺寸达到一定阀值时(可以通过配置文件设定,默认1G),将会创建一个新的文件;

  (3)当buffer中消息的条数达到阀值时(或者距离最近一次flush的时间差"达到阀值)将会触发日志信息flush到日志文件中。

  (4)验证与修复:如果broker失效,极有可能会丢失那些尚未flush到文件的消息.因为server意外实现,仍然会导致log文件格式的破坏(文件尾部),那么就要求当server启动是需要检测最后一个segment的文件结构是否合法并进行必要的修复.

  (5)获取消息时,需要指定offset和最大chunk尺寸:offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可.

  (6)日志文件的删除策略非常简单:启动一个后台线程定期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的创建时间).为了避免删除文件时仍然有read操作(consumer消费),采取copy-on-write方式(在复制一个对象的时候并不是真正的把原先的对象复制到内存的另外一个位置上,而是在新对象的内存映射表中设置一个指针,指向源对象的位置,并把那块内存的Copy-On-Write位设置为1.这样,在对新的对象执行读操作的时候,内存数据不发生任何变动,直接执行读操作;而在对新的对象执行写操作时,将真正的对象复制到新的内存地址中,并修改新对象的内存映射表指向这个新的位置,并在新的内存位置上执行写操作。).

10. 主要配置

(1)Broker配置

 1 broker.id=1
 2 port=9091
 3 num.network.threads=2
 4 num.io.threads=2
 5 socket.send.buffer.bytes=1048576
 6 socket.receive.buffer.bytes=1048576
 7 socket.request.max.bytes=104857600
 8 log.dir=./logs
 9 num.partitions=2
10 log.flush.interval.messages=10000
11 log.flush.interval.ms=1000
12 log.retention.hours=168
13 #log.retention.bytes=1073741824
14 log.segment.bytes=536870912
15 num.replica.fetchers=2
16 log.cleanup.interval.mins=10
17 zookeeper.connect=192.168.0.1:2181,192.168.0.2:2182,192.168.0.3:2183
18 zookeeper.connection.timeout.ms=1000000
19 kafka.metrics.polling.interval.secs=5
20 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
21 kafka.csv.metrics.dir=/tmp/kafka_metrics
22 kafka.csv.metrics.reporter.enabled=false

2. Consumer主要配置

3. procedure主要配置

补充说明:

(1)public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap),其中该方法的参数Map的key为topic名称,value为topic对应的分区数,譬如说如果在kafka中不存在相应的topic时,则会创建一个topic,分区数为value,如果存在的话,该处的value则不起什么作用
(2)关于生产者向指定的分区发送数据,通过设置partitioner.class的属性来指定向那个分区发送数据,如果自己指定必须编写相应的程序,默认是kafka.producer.DefaultPartitioner,分区程序是基于散列的键。

 (3)在多个消费者读取同一个topic的数据,为了保证每个消费者读取数据的唯一性,必须将这些消费者group_id定义为同一个值,这样就构建了一个类似队列的数据结构,如果定义不同,则类似一种广播结构的。

 (4)consumer api中,参数涉及到数字部分,类似Map<String,Integer>,numStream,指的都是在topic不存在的时,会创建一个topic,并且分区个数为Integer.numStream,注意如果数字大于broker的配置中num.partitions属性,会以num.partitions为依据创建分区个数的。

 (5)producer api,调用send时,如果不存在topic,也会创建topic,在该方法中没有提供分区个数的参数,在这里分区个数是由服务端broker的配置中num.partitions属性决定的



 以上总结,均来自传智播客,感谢传智博客王森丰老师

原文地址:https://www.cnblogs.com/sunfie/p/7252523.html