producer
acks 0, 1, -1
0 客户端不需要响应,如果 broker 写入异常,直接关闭连接
1 分区 leader 写入 FileChannel 即返回
-1 和 min.insync.replicas 参数联动
broker
min.insync.replicas = 1 // 这个参数可配置为 topic 级别
// kafka.cluster.Partition#checkEnoughReplicasReachOffset def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = { leaderReplicaIfLocal match { case Some(leaderReplica) => // keep the current immutable replica list reference val curInSyncReplicas = inSyncReplicas def numAcks = curInSyncReplicas.count { r => if (!r.isLocal) if (r.logEndOffset.messageOffset >= requiredOffset) { trace(s"Replica ${r.brokerId} received offset $requiredOffset") true } else false else true /* also count the local (leader) replica */ } trace(s"$numAcks acks satisfied with acks = -1") val minIsr = leaderReplica.log.get.config.minInSyncReplicas if (leaderReplica.highWatermark.messageOffset >= requiredOffset) { /* * The topic may be configured not to accept messages if there are not enough replicas in ISR * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk */ if (minIsr <= curInSyncReplicas.size) (true, Errors.NONE) else (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND) } else (false, Errors.NONE) case None => (false, Errors.NOT_LEADER_FOR_PARTITION) } }
checkEnoughReplicasReachOffset 只有在 producer 的 acks = -1 时,broker才会执行。
当 leader 的 HW 值大于 requiredOffset,且当前 isr 的副本数大于等于 minIsr,则返回正常。
replica.lag.time.max.ms = 10000
如何判断一个 replica 不在 isr 中?如果超过 10s,follower 还没有赶上 leader,则会被移出 isr。
// kafka.cluster.Partition#getOutOfSyncReplicas def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = { // 排除 leader val candidateReplicas = inSyncReplicas - leaderReplica // 当前时间 - lastCaughtUpTimeMs > replica.lag.time.max.ms val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs) if (laggingReplicas.nonEmpty) debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(","))) laggingReplicas } // kafka.cluster.Replica#updateLogReadResult def updateLogReadResult(logReadResult: LogReadResult) { // follower 的 offset 等于 leader 的 LEO if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset) _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs) // follower 的 offset 大于等于上一次 fetch 的 leader 的 LEO else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset) _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs) logStartOffset = logReadResult.followerLogStartOffset logEndOffset = logReadResult.info.fetchOffsetMetadata lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset lastFetchTimeMs = logReadResult.fetchTimeMs }
典型配置:
分区 3 副本 acks = -1 min.insync.replicas = 2
highwatermark 即 HW 的更新,leader 副本是取所有副本最小的 LEO 值,follower 副本取 min(leo, fetchResponse.hw)
leader 先更新 HW 值,然后 follower 拉取消息,根据 leader 的 HW 值更新自身的 HW 值。如果 follower 在更新 HW 值之前,崩溃后又重启,会截断日志到 HW 处。
leader epoch 的作用是啥?
改变过去根据 hw 做日志截断可能导致的消息丢失和消息不一致。
https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation
follower 向 leader 发送 OffsetsForLeaderEpochRequest 请求,获取 leader 在指定 epoch 的 LEO,根据 leader 的 LEO 截断日志。