消息写入

消息写入

生产消息时的rpc请求日志

leader节点

[2019-09-25 19:40:22,266] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8855) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=35,log_start_offset=0,max_bytes=1048576}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,266] INFO testEnter0006-replica:0 newLogEndOffset:35 oldLogEndOffsetMetadata 35; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,267] INFO testEnter0007-maybeIncrementLeaderHW:Set(35 [0 : 2765]) newHighWatermark:35 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,610] INFO Handling request:RequestHeader(apiKey=METADATA, apiVersion=5, clientId=producer-1, correlationId=34) -- {topics=[test.vv19],allow_auto_topic_creation=true} from connection 172.16.113.38:9094-172.16.113.38:60308-1;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)


********************************
[2019-09-25 19:40:22,769] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8856) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=35,log_start_offset=0,max_bytes=1048576}]}]} from connection 
********************************


172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,769] INFO testEnter0006-replica:0 newLogEndOffset:35 oldLogEndOffsetMetadata 35; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,769] INFO testEnter0007-maybeIncrementLeaderHW:Set(35 [0 : 2765]) newHighWatermark:35 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$)

********************************
[2019-09-25 19:40:22,773] INFO Handling request:RequestHeader(apiKey=PRODUCE, apiVersion=5, clientId=producer-1, correlationId=35) -- {acks=-1,timeout=30000,partitionSizes=[test.vv19-0=79]} from connection 172.16.113.38:9094-172.16.113.38:60308-1;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,834] INFO Updated PartitionLeaderEpoch. New: {epoch:27, offset:35}, Current: {epoch:25, offset34} for Partition: test.vv19-0. Cache now contains 11 entries. (kafka.server.epoch.LeaderEpochFileCache)
********************************



********************************
[2019-09-25 19:40:23,313] INFO testEnter0007-maybeIncrementLeaderHW:Set(35 [0 : 2765], 36 [0 : 2844]) newHighWatermark:35 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$)
********************************


[2019-09-25 19:40:23,400] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8857) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=36,log_start_offset=0,max_bytes=1048576}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,400] INFO testEnter0006-replica:0 newLogEndOffset:36 oldLogEndOffsetMetadata 35; (com.code260.ss.KafkaTestUtils$)


********************************
[2019-09-25 19:40:23,401] INFO testEnter0007-maybeIncrementLeaderHW:Set(36 [0 : 2844]) newHighWatermark:36 oldHighWatermark 35; (com.code260.ss.KafkaTestUtils$)
********************************


[2019-09-25 19:40:23,922] INFO Handling request:RequestHeader(apiKey=FETCH, apiVersion=5, clientId=broker-0-fetcher-0, correlationId=8858) -- {replica_id=0,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,topics=[{topic=test.vv19,partitions=[{partition=0,fetch_offset=36,log_start_offset=0,max_bytes=1048576}]}]} from connection 172.16.113.38:9094-172.16.113.38:49385-0;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,923] INFO testEnter0006-replica:0 newLogEndOffset:36 oldLogEndOffsetMetadata 36; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,923] INFO testEnter0007-maybeIncrementLeaderHW:Set(36 [0 : 2844]) newHighWatermark:36 oldHighWatermark 36; (com.code260.ss.KafkaTestUtils$)

更新leader上维护的follower的LEO testEnter0006 时的调用栈
也是在处理follower发布过来的fetch请求时更新

Replica.logEndOffset_$eq(LogOffsetMetadata) line: 98	
Replica.updateLogReadResult(LogReadResult) line: 83	
Partition.updateReplicaLogReadResult(Replica, LogReadResult) line: 276	
ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Tuple2<TopicPartition,LogReadResult>) line: 1314	
ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Object) line: 1308	
TraversableLike$$anonfun$map$1.apply(A) line: 234	
TraversableLike$$anonfun$map$1.apply(Object) line: 234	
ResizableArray$class.foreach(ResizableArray, Function1) line: 59	
ArrayBuffer<A>.foreach(Function1<A,U>) line: 48	
TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234	
ArrayBuffer<A>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104	
ReplicaManager.updateFollowerLogReadResults(int, Seq<Tuple2<TopicPartition,LogReadResult>>) line: 1308	
ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 799	
ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803	
KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597	

更新leader的HW testEnter0007-maybeIncrementLeaderHW 调用栈
在处理follower发布过来的fetch请求时更新

Partition.kafka$cluster$Partition$$maybeIncrementLeaderHW(Replica, long) line: 396	
Partition$$anonfun$maybeExpandIsr$1.apply$mcZ$sp() line: 325	
Partition$$anonfun$maybeExpandIsr$1.apply() line: 309	
Partition$$anonfun$maybeExpandIsr$1.apply() line: 309	
CoreUtils$.inLock(Lock, Function0<T>) line: 217	
CoreUtils$.inWriteLock(ReadWriteLock, Function0<T>) line: 225	
Partition.maybeExpandIsr(int, LogReadResult) line: 307	
Partition.updateReplicaLogReadResult(Replica, LogReadResult) line: 283	
ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Tuple2<TopicPartition,LogReadResult>) line: 1314	
ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(Object) line: 1308	
TraversableLike$$anonfun$map$1.apply(A) line: 234	
TraversableLike$$anonfun$map$1.apply(Object) line: 234	
ResizableArray$class.foreach(ResizableArray, Function1) line: 59	
ArrayBuffer<A>.foreach(Function1<A,U>) line: 48	
TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) line: 234	
ArrayBuffer<A>(AbstractTraversable<A>).map(Function1<A,B>, CanBuildFrom<Traversable<A>,B,That>) line: 104	
ReplicaManager.updateFollowerLogReadResults(int, Seq<Tuple2<TopicPartition,LogReadResult>>) line: 1308	
ReplicaManager.readFromLog$1(int, int, boolean, Seq, ReplicaQuota, IsolationLevel, boolean, boolean, boolean) line: 799	
ReplicaManager.fetchMessages(long, int, int, int, boolean, Seq<Tuple2<TopicPartition,PartitionData>>, ReplicaQuota, Function1<Seq<Tuple2<TopicPartition,FetchPartitionData>>,BoxedUnit>, IsolationLevel) line: 803	
KafkaApis.handleFetchRequest(RequestChannel$Request) line: 597	


Follower节点

[2019-09-25 19:40:22,767] INFO testEnter0005-Received response:apikey:FETCH correlationId 8855; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,767] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:22,768] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)


********************************
[2019-09-25 19:40:23,316] INFO testEnter0005-Received response:apikey:FETCH correlationId 8856; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,369] INFO Updated PartitionLeaderEpoch. New: {epoch:27, offset:35}, Current: {epoch:25, offset34} for Partition: test.vv19-0. Cache now contains 11 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2019-09-25 19:40:23,396] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,397] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
********************************



********************************
[2019-09-25 19:40:23,920] INFO testEnter0005-Received response:apikey:FETCH correlationId 8857; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,921] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [35]lso.messageOffset: [35] (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:23,922] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [36]lso.messageOffset: [36] (com.code260.ss.KafkaTestUtils$)
********************************



[2019-09-25 19:40:24,426] INFO testEnter0005-Received response:apikey:FETCH correlationId 8858; (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:24,427] INFO testEnter0002-001topicPartition:test.vv19:0hwm.messageOffset: [36]lso.messageOffset: [36] (com.code260.ss.KafkaTestUtils$)
[2019-09-25 19:40:24,428] INFO testEnter0002-002topicPartition:test.vv19:0hwm.messageOffset: [36]lso.messageOffset: [36] (com.code260.ss.KafkaTestUtils$)

请求处理

handleProduceRequest ProduceRequest
ReplicaManager.appendRecords

  • timeout: Long // 来自请求体
  • requiredAcks: Short // 来自请求体
  • internalTopicsAllowed: Boolean // request.header.clientId是否是__admin_client
  • isFromClient: Boolean // 固定送true
  • entriesPerPartition: Map[TopicPartition, MemoryRecords]
  • responseCallback: Map[TopicPartition, ProduceResponse.PartitionResponse] => Unit
  • delayedProduceLock: Option[Lock] // 未送
  • processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit

主要写日志逻辑:
kafka.log.Log.append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int) 623行 要细读

消息写入时:Log LogSegment FileRecords MeomoryRecords File LogOffsetMetadata 之间的联系

一条新的消息的offset是怎么产生的?每次append消息后会更新下一次的offset:

 // increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)  

leader节点在处理生产消息请求时对相应的offset的处理

KafkaApis handleProduceRequest
ReplicaManager appendRecords
ReplicaManager appendToLocalLog
processingStatsCallback // KafkaApis def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit
responseCallback // KafkaApis def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse])
ProduceRequest clearPartitionRecords

ReplicaManager appendToLocalLog
生产消息的度量数据收集(为写入速度做准备),全局的和topic粒度的。
check是否是向内部topic发送消息的
消息写入leader partition.appendRecordsToLeader
val info = log.appendAsLeader
replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
(info, maybeIncrementLeaderHW(leaderReplica))
更新firstOffset lastOffset numAppendedMessages
生产消息的度量数据收集(条数和消息大小),全局的和topic粒度的。

Log.append
分配消息的val offset = new LongRef(nextOffsetMetadata.messageOffset)
更新 firstOffset appendInfo.firstOffset = offset.value
更新 lastOffset appendInfo.lastOffset = offset.value - 1
如果有消息校验不同过 收集拒掉的消息的度量数据
更新leader epoch对应的offset(只有当epoch发生改变时才更新,而且更新是直接flush到磁盘且用FD的sync强制落盘,fileOutputStream.getFD().sync()) leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset) 并 LeaderEpochCheckpoint 进行flush
segment.append
producerAppendInfo.maybeCacheTxnFirstOffsetMetadata
producerStateManager.update(producerAppendInfo)
事务消息的一些idx的处理
producerStateManager.updateMapEndOffset
updateLogEndOffset
updateFirstUnstableOffset
按需flush()
返回appendInfo

processingStatsCallback
更新produceMessageConversionsRate度量数据conversionCount topic粒度和全局的,conversionCount是指高版本格式消息向低版本格式消息的转换,转换逻辑在org.apache.kafka.common.record.AbstractRecords.downConvert(Iterable<? extends RecordBatch>, byte, long, Time),调用发起是在org.apache.kafka.clients.producer.internals.Sender.sendProduceRequest(long, int, short, int, List)

responseCallback
是否有错误,有的话写日志;Throttle处理;

Partition.maybeIncrementLeaderHW分析

原文地址:https://www.cnblogs.com/simoncook/p/11809443.html