Apache Kafka框架学习

背景介绍

消息队列的比较

kafka框架介绍

  术语解释

  文件存储

  可靠性保证

  高吞吐量实现

  负载均衡

应用场景

背景介绍:

  kafka是由Apache软件基金会维护的一个开源流处理平台,由scala和java编写。最早开发自LinkedIn,用做LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司作为多种类型的数据管道和消息系统使用。

  kafka是一种分布式的,基于发布/订阅的消息系统。

  kafka特点:

    快速持久化,可以在o(1)的系统开销下进行消息持久化;

    高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;

    完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;

    kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。

    Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

消息队列:

使用消息队列的好处:

   解耦、扩展性、灵活性&峰值处理能力、可恢复性、顺序保证、缓冲......

RabbitMQ

  RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP,SMTP,STOMP,也正因如此,它非常重量级。

Redis

  Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

ZeroMQ

  ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你只需要简单的引用ZeroMQ程序库,就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。

ActiveMQ

  ActiveMQ是Apache下的一个子项目。类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它以少量代码就可以高效的实现高级应用场景。

kafka框架:

  Broker:kafka集群包含一个或多个服务器,每个服务器被称为一个broker。

  Producer:负责发布消息到Kafka broker。

  Consumer:消息消费者,从Kafka broker读取消息的客户端。

  Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。当多个Consumer属于同一个Group时,它们所订阅的消息只会发布到该组的producer;当需要每个Consumer都接受到消息时,可以赋予不同的id。

  Topic:每条发布到Kafka集群的消息都有一个类别。这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。

  Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition;建议每个Topic的Partition数量不超过集群中的broker数量。

  Replica:kafka从0.8版本开始引入了副本机制,目的是为了增加Kafka的高可用性。每个Partition会有多个副本,并且从副本集合中(Assigned Replic,AR)中选取一个副本作为leader副本,所有的读写请求都由leader副本处理。剩余的副本作为Follower副本,Follower副本从leader副本获取消息并更新至自己的Log中。如果leader副本所在的Broker出现故障,会从Follower副本选择一个作为Leader提供服务,保证Kafka的高可用性。

  Topic&Partition&Replica示意图

 

  Topic&Partition&Replica分配算法:

  1.将所有存活的N个Brokers和待分配的Partition排序;

  2.将第i个Partition分配到第(i mod n)个Broker上;并且会作为Partition的优先副本(这里就基本说明了一个topic的leader partition在集群上的大致分布情况);

  3.将第i个Partition的第j个Replica分配到第((i + j)mod n)个Broker上。

  假设集群一共有4个brokers,一个topic有4个partition,每个Partition有3个副本,下图是每个Broker上的副本分配情况。

  

Kafka框架-文件存储机制:

  kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。

  Topic&partition&Message关系图:

 

  partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录:page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为:<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。

Partition的数据文件:

  Partition中的每条Message由offset来表示它在这个partition中的偏移量,不是该Message在partition数据文件中的实际存储位置,而是逻辑上的一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:offset、messageSize、data。其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。

   如上描述,如果每个Partition对应一个存储文件,当一个Partition上存储大量消息时,追加消息的复杂度为o(1);查找一个消息时,需要遍历整个文件,复杂度o(n)。

Kafka解决方案:

  分段:

    比如有100条Message,它们的offset是从0到99.假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。

  

  索引:

    Kakfa为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的。只是文件扩展名为.index。index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。

  消息查找过程:

    比如:要查找绝对offset为7的Message:

      1.用二分查找确定它是在那个LogSegment中:在第一个Segment中。

      2.打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中位置为9087.

      3.打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

      这套机制是建立在offset是有序的。索引文件居中,所以查找的速度还是挺快的。一句话,kafka的Message存储采用了分区(partition),分段(LogSegment)映射到内存和稀疏索引这几个手段来达到了高效性。

Kafka框架-数据可靠性保证:

Broker分析:

  对于broker,落盘的数据,除非磁盘坏了,一般不会丢的。

  对于内存脏(没有flush磁盘)数据,broker重启会丢,可以通过log.flush.interval.messages和log.flush.interval.ms来配置flush间隔,interval大丢的数据多些。

  Replica机制:是否使用replica取决于在可靠性和资源代价之间的平衡。

Consumer从Broker拉取消息:

  Kafka中有两种consumer接口,分别为Low-level API和High-level API

  (1).Low-level API SimpleConsumer,这套接口比较复杂的,使用者必须要考虑很多事情,优点就是对Kafka可以有完全的控制。

  (2).High-level API 使用比较简单,已经封装了partition和offset的管理,默认是会定期自动commit offset,这样可能会丢数据,因为consumer可能拿到数据没有处理完crash。High-level API接口的特点,自动管理,使用简单,但是对Kafka的控制不够灵活。

  一种非常常用的选举leader的方式是“majority vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个replica(包含leader和follower),那在commit之前必须保证有f+1个replica复制完消息,为了确保正确选出新的leader,fail的replica不能超过f个。因为在剩下的任意f+1个replica里,至少有一个replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几台sever,也就是说,如果replication factor是3,那latency就取决于最快的那个follower而非最慢那个。majority vote也有一些劣势,为了保证leader election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的replica,如果要容忍2个follower挂掉,必须要有5个以上的replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的replica,而大量的replica又会在大数据量下导致性能急剧下降。这就是这种算法更多用在Zookeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA feature是基于majority-vote-based journal,但是它的数据存储并没有使用这种expensive的方式。

如何确定一个Broker是否还活着?

  1.它必须维护与Zookeeper的session(这个通过Zookeeper的Heartbeat机制来实现)。

  2.Follower必须能够及时将Leader的消息复制过来,不能“落后太多”。

  Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多“指Follower复制的消息落后于Leader后的条数超过预定值(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.max.messages配置,其默认值是4000)或者Follower超过一定时间(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms来配置,其默认值是10000)未向Leader发送fetch请求。

  这里的复制机制既不是同步复制,也不是单纯的异步复制。事实上,同步复制要求”活着的”follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都落后于leader,而leader突然宕机,则会丢失数据。而Kafka的这种使用“in sync” list的方式则很好的均衡了确保数据不丢失以及吞吐率。follower可以批量的从leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了follower与leader的差距(前文又说到,只要follower落后leader不太远,则被认为在“in sync” list里)。

接受数据的可靠性保证:

  当producer向leader发送数据时,request.required.acks参数来设置可靠性的级别:

  1(默认):producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。

  0:producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性却是最低的。

  -1:producer需要等待ISR中的所有follower都确认接受到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(前面ISR那一节讲到,ISR中的成员由于某些情况会增加也会减少,最少就剩一个leader),这样就变成了acks=1的情况。

接受数据可靠性保证:

  如果要提高数据的可靠性,在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数(可以在broker或者topic层面进行设置)的配合,这样才能发挥最大的功效。

  min.insync.replicas这个参数设定ISR中的最小副本数是多少,默认为1,当且仅当request.required.acks参数设置为-1时,此参数才生效。如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常。

  request.required.acks=-1,同步(Kafka默认为同步,即producer.type=sync)的发送模式。

  replication.factor>=2且min.insvnc.replicas>=2的情况下,不会丢失数据。注:Kafka只处理fail/recover问题,不处理Byzantine问题。

     

  图4中如果选举后一个为leader,则前一个partition的HW不会更新,新消息继续从offset为5的地方存储;此时,producer没有收到ack消息,会继续发送消息4、5,此时产生重复;kafka不解决,有用户自己结局,比如在消息中添加全局Key。

Kafka框架-高吞吐量:

  顺序读/写文件、批量消息传递、数据压缩、Kakfa的消息存储在OS pagecache(页缓存,pagecache的大小为一页,通常为4K,在Linux读写文件时,它用于缓存文件的逻辑内容,从而加快对磁盘上影像和数据的访问)、Topic分为多个Partition,多个Parttition同时提供服务。

Kafka框架-负载均衡:

  producer根据用户指定的算法,将消息发送到指定的partition;

private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
  private val random = new java.util.Random
  def partition(key: T, numPartitions: Int): Int = {
    if(key == null)
    {
        println("key is null")
        random.nextInt(numPartitions)
    }
    else
    {
        println("key is "+ key + " hashcode is "+key.hashCode)
        math.abs(key.hashCode) % numPartitions
    }
  }
}

  Partition&replica均衡:存在多个partition,每个partition有自己的replica,每个replica分布在不同的Broker节点上;

leader均衡:

  每当Leader Partition停止或崩溃领导,由其他副本取代Leader地位。这意味着默认情况下,当该Partition更新启动时,它将只是作为跟随着,不会用于客户端读取和写入,出现于其它Leader在同一个Broker的情况。

  为了避免这种不平衡,Kafka有一个优先副本的概念。如果分区的副本的列表为1,5,9,则节点1优选为节点5或9的引导者,因为它在副本列表中较早。

  设置auto.leader.rebalance.enable = true即可实现上述操作;

  等待ISR中的任一个replica“活”过来,并且选它为leader。选择第一个“活”过来的replica(不一定是ISR中的)作为leader。

  这就需要在可用性和一致性当中做出一个简单的平衡。如果一定要等待ISR中的replica“活”过来,那不可用的时间就可能会行对较长。而且如果ISR中的所有replica都无法“活”过来了,或者数据都丢失了,这个partition将永远不可用。选择第一个“活”过来的replcia作为leader,而这个replica不是ISR中的replica,那即使它并不保证已经包含了所有已commit的消息,他也会成为leader而作为comsumer的数据源(前文有说明,所有读写都由leader完成)。Kafka0.8.×使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。

Consumer 均衡:

  Kafka保证的是稳定状态下每一个Consumer实例只会消费某一个或多个特定Partition的数据。而某个Partition的数据只会被某一个特定的Consumer实例所消费。也就是说Kafka对消息的分配是以Partition为单位分配的,而非以每一条消息作为分配单元。这样设计的劣势是无法保证同一个Consumer Group里的Consumer均匀消费数据,优势是每个Consumer不用都跟大量的Broker通信,减少通信开销,同时也降低了分配难度,保证每个Partition里的数据可以被有序消费。

Consumer均衡方法:

  如果某Consumer Group中Consumer(每个Consumer只创建1个MessageStream)数量少于Partition数量,则至少有一个Consumer会消费多个Partition的数据;如果Consumer的数量和Partition数量相同,则正好一个Consumer消费一个Partition的数据。而如果Consumer的数量多于Partition的数量时,会有部分Consumer无法消费该Topic下任何一条消息。当添加、删除Consumer时,会触发Consumer的Rebalance算法,重新分配每个Consumer消费的Partition。

  Consumer Rebalance的算法如下:

 Kafka框架-应用场景:

  消息队列:比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。

  网站活动跟踪:跟踪用户浏览页面、搜索以及其他行为,以发布-订阅的模式实时记录到对应的topic里。再做进一步的实时处理,或实时监控,或放到hadoop/离线数据仓库里处理。

  日志收集:服务器上收集日志文件,抽象成一个个日志或事件的信息流,Kafka处理过程延迟低,更容易支持多数据源和分布式数据处理。

  流处理:保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户将那些从原始topic来的数据进行阶段性处理、汇总、扩充或者以其他的方式转换到新的topic下再继续后面的处理。

原文地址:https://www.cnblogs.com/fengbing9891/p/8417995.html