快速理解Kafka分布式消息队列框架

由于上一节中提到唯品会应用的技术,在这里,先说一下消息发送,kafka

 二、搭建Kafka开发环境
搭建开发环境需要引入kafka的jar包,一种方式是将Kafka安装包中lib下的jar包加入到项目的classpath中,这种比较简单了。不过我们使用另一种更加流行的方式:使用maven管理jar包依赖。创建好maven项目后,在pom.xml中添加以下依赖:

  1. <dependency>
  2.         <groupId> org.apache.kafka</groupId >
  3.         <artifactId> kafka_2.10</artifactId >
  4.         <version> 0.8.0</ version>
  5. </dependency>
配置程序

首先是一个充当配置文件作用的接口,配置了Kafka的各种连接参数:
  1. package com.sohu.kafkademon;
  2. public interface KafkaProperties
  3. {
  4.     final static String zkConnect = "10.22.10.139:2181";
  5.     final static String groupId = "group1";
  6.     final static String topic = "topic1";
  7.     final static String kafkaServerURL = "10.22.10.139";
  8.     final static int kafkaServerPort = 9092;
  9.     final static int kafkaProducerBufferSize = 64 * 1024;
  10.     final static int connectionTimeOut = 20000;
  11.     final static int reconnectInterval = 10000;
  12.     final static String topic2 = "topic2";
  13.     final static String topic3 = "topic3";
  14.     final static String clientId = "SimpleConsumerDemoClient";
  15. }

Kafka是由Linkedin开发的一个分布式的消息队列系统(Message Queue)

Kafka解决的问题:

kafka开发的主要初衷目标是构建一个用来处理海量日志,用户行为和网站运营统计等的数据处理框架。在结合了数据挖掘,行为分析,运营监控等需求的情况下,需要能够满足各种实时在线和批量离线处理应用场合对低延迟和批量吞吐性能的要求。从需求的根本上来说,高吞吐率是第一要求,其次是实时性和持久性。

 

既有的消息队列框架或者对消息传送的可靠性提供了较高的保证,由此带来较大的负担,不能满足海量高吞吐率的要求;或者完全面向实时消息处理系统,对于批量离线处理的场合无法提供足够的缓存和持久性要求。

而多数针对大数据开发应用的日志收集处理系统(e.g. scribe, flume)则通常更适合批量离线处理场合,对实时在线处理的场合支持不够。

总体而言,kafka试图提供一个同时满足在线和离线处理海量数据的消息派发系统。

kafka实现的原理:

kafka的集群有多个Broker服务器组成,每个类型的消息被定义为topic,同一topic内部的消息按照一定的key和算法被分区(partition)存储在不同的Broker上,消息生产者producer和消费者consumer可以在多个Broker上生产/消费topic

 

核心思想

以高效率作为第一设计原则,kafka的结构设计在很多方面都做了激进的取舍。

=极简的数据结构和应用模式 =

消息队列是以log文件的形式存储,消息生产者只能将消息添加到既有的文件尾部,没有任何ID信息用于消息的定位,完全依靠文件内的位移,因此消息的使用者只能依靠文件位移顺序读取消息,这样也就不需要维护复杂的支持随即读取的索引结构。

kafka broker完全不维护和协调多用户使用消息的行为模式,用户自己维护位移用来索引消息。

最小的并发访问单位就是partition分区,同一用户组内的所有用户(可以理解为同一个应用的所有并发进程)只能有一个访问同一分区,同时分区的个数是固定的,不支持动态调整。这样最大简化了多进程/分布式client之间对消息处理访问的并发控制的复杂度,当然也带来一定的使用模式上的限制(比如最大并发度完全取决于预先规划的partition的个数)

此外分区也带来一个问题就是消息只是分区内部有序而不是全局有序的。如果需要全局有序,应用需要自己靠别的机制来保证。

使用Pull模式派发消息,消息的使用情况,比如是否还有consumer没有读取,是否重复读取(改进中)等,在Broker端也完全不跟踪维护,消息的过期处理简单的由定时器定时删除(比如保留7天),由此简化各种消息跟踪维护的开销。

=采取各种方式最大化数据传输效率 =

比如生产者和消费者可以批量读写消息减少RPC开销

使用Zero Copy方式在内核层直接将文件内容传送给网络Socket,避免应用层数据拷贝

使用合理的压缩格式等

=激进的内存管理模式 =

基本的意思就是不管理。。。kafka不在JVM进程内部维护消息Cache,消息直接从文件中读写,完全依赖操作系统在文件系统层面的cache,避免在JVM中管理Cache带来的额外数据结构开销和GC带来的性能代价。基于批量处理和顺序读写的应用模式,最大化利用文件系统的Cache机制和规避文件读写相对内存读写的性能代价。

= HA =

 

kafka0.8之前message是没有备份容错机制的,producer的工作模式是fire and forget,如果一个broker失效,那么相关topic分区的相关消息也就丢失了。这种设计的原因在于最初的应用模式,如日志/用户行为等消息的处理,对数据的健壮性方面要求不高,可以容忍部分数据的缺失。采用fire and forget 模式,不需要等待Broker ack,有利于提高producer的吞吐率。

不过在0.8版本中,添加了数据replica的机制,一个消息分区的多个replica分布在不同的Broker上,由leader replica负责日常读写,通过zookeeper监督failover,不同的分区的leader replica均衡负载到不同的Broker上。在这种情况下,producer可以选择不等待leader replicaAck,部分Ack,或者完全备份完毕后Ack等不同的ack机制。这三种机制,性能依次递减 (producer吞吐量降低1-3),数据健壮性则依次递增。

Kafka是一个分布式的消息队列,学习见Apache Kafka文档,中文翻译见Kafka分享,一个简单的入门例子见kafka代码入门实例

1.架构

Producer/Consumer:消息的生成者和使用者。
Broker:kafka server充当broker角色,起到消息队列的作用。
topic/partion:topic是一类消息的名称,一个topic下的消息可以分成多区(partion)存储,一个分区是一个有序队列(消息按接收时间依次追加,利用offset做为唯一id),分区间消息无序。
zookeeper:broker和consumer向zk注册,实现元数据的保存和交换、集群管理。
push/pull:producer通过一个初始broker.list与broker建立连接,获取所有broker信息,主动向一个topic的分区leader推送信息;consumer通过zk获取broker列表,主动从broker拉取信息。
consumer group:为了提高consumer处理并行性,多个consumer可以组成一个group,一个topic下的消息会保证每个group中的一个consumer消费,一个group中的consumer交错消费整个topic。简单说,topic下的一条消息会给所有的group,但一个group中只有一个consumer接收到该消息。

2.数据存储与备份

一个机子存储不下一个topic时,如何做?又如何做消息冗余备份呢?
这就是分区的目的,在创建topic时可以指定其分区数、备份数,消息会被hash到不同的分区存储(其hash过程由producer在客户端处理),每个分区有自己的机器。
 
 
如上,该kafka集群有3个broker(3个节点),名字为my-replicated-topic-1的topic有4个partition,每个partition有2个replica(每条消息备份2次,每个replica位于不同的节点),一个partion有一个leader负责该partition内的数据读写。producer根据消息的key hash到一个partition后,直接和该partition的leader通信。
其中ISR(in-sync replicas)同步列表是该partition中处于同步状态的备份broker,如果一个broker长时间不和leader通信,或者消息数与leader相差太大,leader就会将该broker从ISR队列中移除,该broker就会后台从leader fetch数据,直到up-to-date,然后再次加入ISR同步列表。
这些patition、replica信息与状态都存储在zookeeper中,便于leader重新选举和信息同步。
原文地址:https://www.cnblogs.com/huojg-21442/p/7514920.html