Kafka的消息格式

Commit Log

Kafka储存消息的文件被它叫做log,按照Kafka文档的说法是:

Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log

这反应出来的Kafka的行为是:消息被不断地append到文件末尾,而且消息是不可变的。

这种行为源于Kafka想要实现的功能:高吞吐量,多副本,消息持久化。这种简单的log形式的文件结构能够更好地实现这些功能,不过也会在其它方面有所欠缺,比如检索消息的能力。

而Kafka的行为也决定了它的消息格式。对于Kafka来说,消息的主体部分的格式在网络传输中和磁盘上是一致的,也就是说消息的主体部分可以直接从网络读取的字节buffer中写入到文件(部分情况下),也可以直接从文件中copy到网络,而不需要在程序中再加工,这有利于降低服务器端的开销,以及提高IO速度(比如使用zero-copy的传输)。

这也就决定了Kafka的消息格式必须是适于被直接append到文件中的。当然啥都可以append到文件后面,问题在于怎么从文件中拆分出来一条条记录。

记录的划分以及消息的格式

对于日志来说,一条记录以" "结尾,或者通过其它特定的分隔符分隔,这样就可以从文件中拆分出一条一条的记录,不过这种格式更适用于文本,对于Kafka来说,需要的是二进制的格式。所以,Kafka使用了另一种经典的格式:在消息前面固定长度的几个字节记录下这条消息的大小(以byte记),所以Kafka的记录格式变成了:

Offset MessageSize Message

消息被以这样格式append到文件里,在读的时候通过MessageSize可以确定一条消息的边界。

需要注意的是,在Kafka的文档以及源码中,消息(Message)并不包括它的offset。Kafka的log是由一条一条的记录构成的,Kafka并没有给这种记录起个专门的名字,但是需要记住的是这个“记录”并不等于"Message"。Offset MessageSize Message加在一起,构成一条记录。而在Kafka Protocol中,Message具体的格式为

Message => Crc MagicByte Attributes Key Value
  Crc => int32
  MagicByte => int8
  Attributes => int8
  Key => bytes
  Value => bytes

各个部分的含义是

Field

Description

Attributes

This byte holds metadata attributes about the message. The lowest 2 bits contain the compression codec used for the message. The other bits should be set to 0.

Crc

The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer.

Key

The key is an optional message key that was used for partition assignment. The key can be null.

MagicByte

This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 0.

Offset

This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.

Value

The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null.

MessageSet

之所以要强调记录与Message的区别,是为了更好地理解MessageSet的概念。Kafka protocol里对于MessageSet的定义是这样的

MessageSet => [Offset MessageSize Message]
  Offset => int64
  MessageSize => int32

也就是说MessageSet是由多条记录组成的,而不是消息,这就决定了一个MessageSet实际上不需要借助其它信息就可以从它对应的字节流中切分出消息,而这决定了更重要的性质:Kafka的压缩是以MessageSet为单位的。而以MessageSet为单位压缩,决定了对于压缩后的MessageSet,不需要在它的外部记录这个MessageSet的结构,也就决定了Kafka的消息是可以递归包含的,也就是前边"value"字段的说明“Kafka supports recursive messages in which case this may itself contain a message set"。

具体地说,对于Kafka来说,可以对一个MessageSet做为整体压缩,把压缩后得到的字节数组作为一条Message的value。于是,Message既可以表示未压缩的单条消息,也可以表示压缩后的MessageSet。

压缩后的消息的读取

就看Message头部的Attributes里的压缩格式标识。说到这个,得说下递归包含的事情,理论上,一个压缩的的MessageSet里的一个Message可能会是另一个压缩后的MessageSet,或者包含更深层的MessageSet。但是实际上,Kafka中的一个Message最多只含有一个MessageSet。从Message中读取MessageSet的逻辑,可以在ByteBufferMessageSet的internalIterator方法中找到:

        if(isShallow) { //是否要进行深层迭代
          new MessageAndOffset(newMessage, offset)
        } else { //如果要深层迭代的话
          newMessage.compressionCodec match {
            case NoCompressionCodec =>
              innerIter = null
              new MessageAndOffset(newMessage, offset) //如果这个Message没有压缩,就直接把它作为一个Message返回
            case _ =>
              innerIter = ByteBufferMessageSet.deepIterator(newMessage) //如果这个Message采用了压缩,就对它进行深层迭代
              if(!innerIter.hasNext)
                innerIter = null
              makeNext()
          }
        }

而ByteBufferMessageSet的deepIterator方法就是对这个Message的value进行解压,然后从中按照Offset MessageSize Message的格式读取一条条记录,对于这次读取的Message,就不再进行深层迭代了。下面是deepIterator的makeNext方法,它被不断调用以生成迭代器的元素

      override def makeNext(): MessageAndOffset = {
        try {
          // read the offset
          val offset = compressed.readLong()
          // read record size
          val size = compressed.readInt()

          if (size < Message.MinHeaderSize)
            throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")

          // read the record into an intermediate record buffer
          // and hence has to do extra copy
          val bufferArray = new Array[Byte](size)
          compressed.readFully(bufferArray, 0, size)
          val buffer = ByteBuffer.wrap(bufferArray)

          val newMessage = new Message(buffer)

          // the decompressed message should not be a wrapper message since we do not allow nested compression
          new MessageAndOffset(newMessage, offset)
        } catch {
          case eofe: EOFException =>
            compressed.close()
            allDone()
          case ioe: IOException =>
            throw new KafkaException(ioe)
        }
      }

KAFKA-1718

至于一个MessageSet中不能包含多个压缩后的Message(压缩后的Message也就是以压缩后的MessageSet作为value的Message),Kafka Protocol中是这么说的

The outer MessageSet should contain only one compressed "Message" (see KAFKA-1718 for details).

KAFKA-1718就是在Protocol里添加这么一个特殊说明的原因。事情是这样的:

报各这个问题的人是Go语言client的作者,他发现自己发的Message明显没有过大,但是发生了MessageSizeTooLargeException。后来跟其它人讨论,发现是因为broker端在调用Log.append时,会把传送给这个方法的MessageSet解压开,然后再组合成一个压缩后的MessageSet(ByteBufferMessageSet)。而Go语言的客户端发送的MessageSet中包含了多个压缩后的Message,这样即使发送时的Message不会超过message.max.bytes的限制,但是broker端再次生成的Message就超过了这个限制。所以,Kafka Protocol对这种情况做了特殊说明:The outer MessageSet should contain only one compressed "Message"。

Compressed Message的offset

即然可以把压缩后的MessageSet作为Message的value,那么这个Message的offset该如何设置呢?

这个offset的值只有两种可能:1, 被压缩的MessageSet里Message的最大offset; 2, 被压缩的MessageSet里Message的最小offset.

这两种取值没有功能的不同,只有效率的不同。

由于FetchRequest协议中的offset是要求broker提供大于等于这个offset的消息,因此broker会检查log,找到符合条件的,然后传输出去。那么由于FetchRequest中的offset位置的消息可位于一个compressed message中,所以broker需要确定一个compressed Message是否需要被包含在respone中。

  • 如果compressed Message的offset是它包含的MessageSet的最小offset。那么,我们对于这个Message是否应包含在response中,无法给出"是”或"否“的回答。比如FetchRequest中指明的开始读取的offset是14,而一个compressed Message的offset是13,那么这个Message中可能包含offset为14的消息,也可能不包含。
  • 如果compressed Message的offset是它包含的MessageSet的最大offset,那么,可以根据这个offset确定这个Message“不应该”包含在response中。比如FetchRequest中指明的开始读取的offset是14,那么如果一个compressed Message的offset是13,那它就不该被包含在response中。而当我们顺序排除这种不符合条件的Message,就可以找到第一个应该被包含在response中的Message(压缩或者未压缩), 从它开始读取。

在第一种情况下(最小offset),我们尽管可以通过连续的两个Message确定第一个Message的offset范围,但是这样在读取时需要在读取第二个Message的offset之后跳回到第一个Message,  这通常会使得最近一次读(也就读第二个offset)的文件系统的缓存失效。而且逻辑比第二种情况更复杂。在第二种情况下,broker只需要找到第一个其offset大于或等于目标offset的Message,从它可以读取即可,而且也通常能利用到文件系统缓存,因为offset和消息内容有可能在同一个缓存块中。

在处理FetchRequest时,broker的逻辑也正是如此。对FetchRequest的处理会调用到Log#read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None)方法,然后调用到LogSegment的read方法,它的之后的调用有很多,所有不贴代码了,它的注释说明了读取的逻辑

* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified

即,返回的MessageSet的第一条Message的offset >= startOffset。

而在broker给compressed Message赋予offset时,其逻辑也是赋予其包含的messages中的最大offset。这段逻辑在ByteBufferMessageSet的create方法中:

      messageWriter.write(codec = compressionCodec) { outputStream =>
        val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) //创建压缩流
        try {
          for (message <- messages) {
            offset = offsetCounter.getAndIncrement //offsetCounter是一个AtomicLong,使用它的当前值作为这条Message的offset,然后+1作为下一条消息的offset
            output.writeLong(offset)//写入这条日志记录的offset
            output.writeInt(message.size)//写入这条日志记录的大小
            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) //写入这条记录的Message
          }
        } finally {
          output.close()
        }
      }
      val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
      writeMessage(buffer, messageWriter, offset)//以最后一个Message的offset作为这个compressed Message的offset

Validate Message

什么需要验证?

先看一下消息的哪些特征需要被验证。

首先,网络传输过程中,数据可能会产生错误,即使是写在磁盘上的消息,也可能会由于磁盘的问题产生错误。因此,broker对接收到的消息需要验证其完整性。这里的消息就是前边协议里定义的Message。对于消息完整性的检测,是使用CRC32校验,但是并不是对消息的所有部分计算CRC,而是对Message的Crc部分以后的部分,不包括记录的offset和MessageSize部分。把offset和MessageSize加到CRC计算中,可以对完整性有更强的估证,但是坏处在于这两个部分在消息由producer到达broker以后,会被broker重写,因此如果把它们计算在crc里边,就需要在broker端重新计算crc32,这样会带来额外的开销。

CRC32没有检测出错误的概率在0.0047%以下,加上TCP本身也有校验机制,不能检测出错误的概率就很小了(这个还需要再仔细算一下)。

除了消息的完整性,还需要对消息的合规性进行检验,主要是检验offset是否是单调增长的,以及MessageSize是超过了最大值。

这里检验时使用的MessageSize就不是Message本身的大小了,而是一个记录的大小,包括offset和MessageSize,这个也挺奇怪的,有必要非拉上这俩吗?

而且在broker端检验producer发来的MessageSet时,也没必要检验它的offset是否是单调增长的呀,毕竟leader还要对Message的offset重新赋值。而follower是从leader处拉取的,如果网络或者磁盘出错,通过对offset的单调性检查也可能会漏掉出错了的记录,对于consumer来说也是同理。所以这里有点奇怪。

何时需要验证?

在broker把收到的producer request里的MessageSet append到Log之前,以及consumer和follower获取消息之后,都需要进行校验。

这种情况分成两种:

1. broker和consumer把收到的消息append到log之前

2. consumser收到消息后

第一种情况都是在调用Log#append时进行检验的。

如何验证?

先看下Log#append的方法声明

def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo

在replica的fetcher线程调用append方法时,会把assignOffsets设成false,而leader处理produce request时,会把assignOffsets设成true。

下面append方法的一部分代码

    val appendInfo = analyzeAndValidateMessageSet(messages) //验证消息
    
    // if we have any valid messages, append them to the log
    if(appendInfo.shallowCount == 0)
      return appendInfo
      
    // trim any invalid bytes or partial messages before appending it to the on-disk log
    var validMessages = trimInvalidBytes(messages, appendInfo)//trim掉不可用的部分或者残缺的消息

    try {
      // they are valid, insert them in the log
      lock synchronized {
        appendInfo.firstOffset = nextOffsetMetadata.messageOffset 

       if(assignOffsets) { //如果需要重新赋予offset
          // assign offsets to the message set
          val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
          try {
            validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact) //验证消息并且赋予offset
          } catch {
            case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
          }
          appendInfo.lastOffset = offset.get - 1
        } else {
          // we are taking the offsets we are given
          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
            throw new IllegalArgumentException("Out of order offsets found in " + messages)
        }

        // re-validate message sizes since after re-compression some may exceed the limit 对压缩后消息重新验证MessageSize是否超过了允许的最大值
        for(messageAndOffset <- validMessages.shallowIterator) {
          if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
            // we record the original message set size instead of trimmed size
            // to be consistent with pre-compression bytesRejectedRate recording
            BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
            BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
            throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
              .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
          }
        }

注意到对MessageSize验证了两次,第二次是对重新压缩后的消息。KAFKA-1718里提到MessageSizeToLargeException,就是在这时候检测出来的。

初步检验:analyzeAndValidateMessageSet

具体的检验消息完整性和offset单调增长的逻辑在analyzeAndValidateMessageSet方法里。这个方法的实现里,需要注意几点:

  1. 它是使用ByteBufferMessageSize的shallowIterator来对这个MessageSet的消息进行迭代,这也意味着并不会对compressed message里边的MessageSet解压后再进行检验,而是把comprssed message作为单个Message进行检验。
  2. 它计算checksum时,是计算的MagicByte及其以后的内容。
     def computeChecksum(): Long = 
        CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset,  buffer.limit - MagicOffset)
  3. 它比较的是entrySize与MaxMessageSize的大小,来确定这个消息是否太大
      def entrySize(message: Message): Int = LogOverhead + message.size
    
    ---------------------------------
    
      val MessageSizeLength = 4
      val OffsetLength = 8
      val LogOverhead = MessageSizeLength + OffsetLength
  4. 它返回的LogAppendInfo中会包括一个targetCodec,指明这个MessageSet将要使用的压缩方式。leader处理produce request时,将使用这个压缩方式重新压缩整个MessageSet。
        val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)

    config.compressionType就是broker配置里的compression.type的值,如果它是“producer", 就会使用producer request使用压缩方式,否则就使用config.compressionType指明的压缩方式。注意如果一个MessageSet里的Message采用了不同的压缩方式,最后被当成sourceCodec的是最后一个压缩了的消息的压缩方式。

再次检验并且赋予offset :validateMessagesAndAssignOffsets

只有leader处理produce request时,会调用ByteBufferMessageSet的这个方法。 它不会检测analyzeAndValidateMessageSet已经检测的内容,但是会把这个MessageSet进行深度遍历(即如果它里边的消息是压缩后,就把这个消息解压开再遍历),这样它就能做analyzeAndValidateMessageSet不能进行的检测:对于compacted topic检测其key是否为空,如果为空就抛出InvalidMessageException。

另外,它会把深度遍历后获得的Message放在一起重新压缩。

如果MessageSet的尾部不是完整的Message呢?

这是在获取ByteBufferMessageSet的iternalIterator时候处理的。

      def makeNextOuter: MessageAndOffset = {
        // if there isn't at least an offset and size, we are done
        if (topIter.remaining < 12)
          return allDone()
        val offset = topIter.getLong()
        val size = topIter.getInt()
        if(size < Message.MinHeaderSize)
          throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator")

        // we have an incomplete message
        if(topIter.remaining < size)
          return allDone()
    .    ...
    }    

注意返回allDone()和抛出InvalidMessageException的时机。

  • 如果这个MessageSet剩下部分不到12bytes,那剩下的部分就是下一个MessageSet头部的一部分,是没法处理的,也是没办法检验的,因此就返回allDone。
  • 如果够12bytes,就可以读出offset和MessageSize。MessageSize至少会大于Message头里边的那些crc、Attributes, MagicBytes等加起来的大小,因此如果MessageSize比这个还小,就肯定是个entry有问题,所以就抛出异常。这里的问题在于,即使MessageSet最后的那个Message是不完整的,只要MessageSize有问题,也会抛异常,而不是忽略这个不完整的Message。(这个可能是没考虑到,也可能是有别的考虑,不过无论怎么处理最后的这个不完整的Message,都有一定的道理)。

 consumer端的验证

consumer(0.9)会检查checksum,不过是可以配置的,原因正如config里说的一样。

    public static final String CHECK_CRCS_CONFIG = "check.crcs";
    private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";

config的文档说,检查checksum是为了"ensures no on-the-wire or on-disk corruption to the message occurred."即,为了保证没有在网络传输出或者磁盘存储时出现了消息的损坏。但是checksum计算时会带来开销,所以追求最佳性能,可以关掉checksum的检查。


下面来看一下几个与消息格式相关的KIP。为什么需要这些改变呢?为什么之前没有实现这些改变呢?都是因为各种折衷吧,需求与性能折衷,需求与实现所需的工作量的折衷……

下面的几个KIP可能会一起加上去,毕竟都是对消息格式的修改,不能搞冲突了。

KIP-31 - Move to relative offsets in compressed message sets

前边提到了,在leader收到ProduceRequet之后,它会解压开compressed message(也就是是这个KIP里的compressed messageset,这两说说法的确有些乱),然后给里边包含的message set的每条消息重新赋予offset。这个做法也是应该的,乍一看也没什么不好。但是问题在于,不仅是直接改个offset这么简单,在改完之后,需要重新压缩这些消息,还要计算。这么一搞,开销就大了。KIP-31就是想把这部分的性能损失降下来。(这个KIP已经是accepted状态)

做法是把在一个compressed message set里边的每个message的offset里记下当前message相对于外层的wrapper message的偏移。用汉语说这个意思比较费劲,KIP里这么说

When the producer compresses a message, write the relative offset value in the raw message's offset field. Leave the wrapped message's offset blank.

When broker receives a compressed message, it only needs to 

    1. Decompress the message to verify the CRC and relative offset.
    2. Set outer message's base offset. The outer message's base offset will be the offset of the last inner message.  (Since the broker only needs to update the message-set header, there is no need to re-compress message sets.)

注意,这个wrapper message里记的base offset, 是它所含的message set里的最后一个message的offset。这个和当前的compressed message的offset是一致的。

然后当broker收到一个压缩后的消息时,它只需要

  • 验证CRC与realtive offset的正确性
  • 重新设定外层消息的offset,也就是base offset。

KIP-32 - Add timestamps to Kafka message

在消息里加时间戳。需要注意的是,这个KIP还在讨论中(以下的内容是基于2016年1月7日的版本)。不像上一个已经确定了。

(俺是觉得这个事情早该做了……)

首先,来看一下动机,这个提有意思

Motivation

This KIP tries to address the following issues in Kafka.

  1. Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
  2. Log rolling might break for a newly created replica as well because of the same reason as (1).
  3. Some use cases such as streaming processing needs a timestamp in messages.

说的是这几个原因

1. Log retention会不靠谱。当前log retention是在log segment层面做的,是按照log segment的最后修改时间确定是否要删除一个log segment. 但是,当replica重分配发生时,新被分配的这个replica的log segment的修改时间会被设成当前时间。这么一来,它就不能被按照log retention想要做的那样(实际上是想把一段时间之前的消息删除)被删除。

2. 由于和1同样的原因,对于一个新创建的replica(意思应该是移动位置的replica, 并不是增加分区后新加的replica)log rolling有时候也会不靠谱。

3. 有些场景中需要消息含有时间戳,比如流处理。

感觉,貌似第三个原因才是决定性的,拥抱流处理。

接口的变化

准备在Message里加入timestamp字段

准备增加两个配置

  • message.timestamp.type 可以选CreateTime或者LogAppendTime,CreateTime就是这条消息生成的时间,是在producer端指定的。LogAppendTime就是append到log的时间(实现细节没有说明)。
  • max.message.time.difference.ms 如果选择了CreateTime, 那么只有当createTime和broker的本地时间相差在这个配置指定的差距之内,broker才会接受这条消息。

纠结之处

之前关于这个KIP的讨论主要是关于使用哪个时间, 是使用LogAppendTime(broker time),还是CreateTime(application time)。

两种都有利有弊:

The good things about LogAppendTime are: 使用LogAppendTime的好处在于

  1. Broker is more robust. Broker比起用户程序更健壮(更不容易出错,比如用户程序可能有bug,导致CreateTime设置的不正确,想一想KIP-33,如果错得离谱,索引怎么建?)
  2. Monotonically increasing. LogAppendTime是单调增长的。(但是,follower收到的消息的timestamp该怎么设呢?如果不用leader带来的,就不能确定是否monotonically increasing)
  3. Deterministic behavior for log rolling and retention.log rolling和retention的行为是确定性的。(如果按消息里的这个timestamp来决定这两个操作的行为,那么让用户指定timestamp的确挺危险的)
  4. If CreateTime is required, it can always be put into the message payload.如果需要CreateTime,可以加到消息的内容里。(这个的确是……)

The good things about CreateTime are: 使用CreateTime的好处是

  1. More intuitive to users. 更符合用户的思维(用户当然是想使用自己填进去的时间)。
  2. User may want to have log retention based on when the message is created instead of when the message enters the pipeline.用户可能更希望用消息被创建的时间来决定log retention的行为,而不是消息进行处理管道的时间。
  3. Immutable after entering the pipeline.这样,消息的timestamp在进入管道后就不会再改变了。

在俺看来,这两个选择的确挺纠结的。用户肯定是想用自己产生消息的时间,不然很难准确地找到一条消息。但是,如果使用用户指定的时间,broker端的行为就变得复杂了,比如,如果用户指定的时间不是单调递增的,该怎么建时间索引。但是用户产生畸形的时间,倒可以通过配置里max.message.time.difference.ms来控制。或许可以加另一个配置,允许broker在一定范围内修改CreateTime,比如最多可以更改1000ms。这样就能即使消息的timestamp单调增长,也能使用户对消息的时间的估计比较准确。不过,这样可能就需要让broker time的含义变成broker收到消息时间,而不是append到log的时间。否则就难以确定何时该拒绝无法在指定范围内修改timestamp的消息。

KIP-33 - Add a time based log index

动机:

当前按照时间戳查找offset得到的结果是非常粗粒度的,只能在log segment的级别。(对于reassigned replica就差得没谱了。)所以这个KIP提议建一个基于时间的对日志的索引,来允许按timestamp搜索消息的结果更准确。

这个KIP和KIP-32是紧密相关的。这俩KIP都在讨论过程中。

原文地址:https://www.cnblogs.com/devos/p/5100611.html