Kafka2.0生产者客户端源码分析

1 KafkaProducer 构造器

  1. 初始化参数配置。
  2. 初始化记录累加器 RecordAccumulator。
  3. 初始化 Kafka 连接 KafkaClient,发现集群的所有节点加入缓存。
  4. 初始化实现了 Runnable 接口的 Sender 对象,并在 ioThread 中启动线程。

2 发送消息

  1. 执行消息拦截器
  2. 查询 Kafka 集群元数据
  3. 序列化 key、value
  4. 获取分区
  5. 把消息添加到记录累加器中
  6. 当 batch 满了,或者创建了新的 batch 后,唤醒 Sender 线程

  核心源码如下

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
	// 执行拦截器
	ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
	return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
	TopicPartition tp = null;
	// 获取元数据
	ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
	Cluster cluster = clusterAndWaitTime.cluster;
	// 序列化 key、value
	byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
	byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
	// 获取分区。
	// 如果为空,会计算 key 的 hash 值,再和该主题的分区总数取余得到分区号;
	// 如果 key 也为空,客户端会生成递增的随机整数,再和该主题的分区总数区域得到分区号。
	int partition = partition(record, serializedKey, serializedValue, cluster);
	tp = new TopicPartition(record.topic(), partition);
	// 校验序列化后的记录是否超过限制
	int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
			compressionType, serializedKey, serializedValue, headers);
	ensureValidRecordSize(serializedSize);
	// 时间戳,默认是 KafkaProducer 初始化时间
	long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
	// 初始化回调和响应的拦截器对象
	Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
	// 把消息添加到记录累加器中
	RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
			serializedValue, headers, interceptCallback, remainingWaitMs);
	if (result.batchIsFull || result.newBatchCreated) {
		// 当 batch 满了,或者创建了新的 batch 后,唤醒 Sender 线程
		this.sender.wakeup();
	}
	return result.future;
}

 2.1 查询元数据

  1. 如果根据指定的主题和分区能在缓存中查找到,则直接返回元数据,结束流程。
  2. 否则,设置需要更新元数据的标记 needUpdate=true,并获取当前的 version。
  3. 唤醒 Sender 线程,当 Sender 线程判断 needUpdate=true 时,发送获取元数据的请求到 broker,获取到后更新 needUpdate=true,version+1。
  4. 当前线程判断,如果 version 变大,说明元数据已更新,则跳出循环,拉取新的元数据,判断是否匹配到主题和分区,如果没有匹配到,返回第2步。
  5. 如果 version 没变大,说明元数据还没更新,则调用 wait(long timeout) 方法,等待 timeout 时间后,返回第4步。
  6. 当第4步获取到匹配的元数据后,返回给 doSend 方法。

  核心源码如下

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) {
	// 获取缓存的集群信息
	Cluster cluster = metadata.fetch();
	Integer partitionsCount = cluster.partitionCountForTopic(topic);
	// Return cached metadata if we have it, and if the record's partition is either undefined
	// or within the known partition range
	// 如果缓存中的数据满足条件,直接返回缓存中的元数据。
	if (partitionsCount != null && (partition == null || partition < partitionsCount))
		return new ClusterAndWaitTime(cluster, 0);

	long begin = time.milliseconds();
	long remainingWaitMs = maxWaitMs;
	long elapsed;
	do {
		// 更新元数据的标记 needUpdate=true,并获取当前的 version。
		int version = metadata.requestUpdate();
		sender.wakeup(); // 唤醒 Sender 线程
		try {
			metadata.awaitUpdate(version, remainingWaitMs); // 等待更新
		} catch (TimeoutException ex) {
		}
		cluster = metadata.fetch(); // 重新获取元数据
		elapsed = time.milliseconds() - begin;
		if (elapsed >= maxWaitMs) // 超出最大等待时间,抛出异常
			throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
		remainingWaitMs = maxWaitMs - elapsed;
		partitionsCount = cluster.partitionCountForTopic(topic);
	} while (partitionsCount == null); // 分区数量是 0,继续上述循环
	if (partition != null && partition >= partitionsCount) { // 当指定的分区号大于等于分数总数时,异常
		throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
	}
	return new ClusterAndWaitTime(cluster, elapsed);
}
// 等待更新
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
     long begin = System.currentTimeMillis();
     long remainingWaitMs = maxWaitMs;
     // 版本号<=当前版本号,说明未更新,需要继续循环等待更新
     while ((this.version <= lastVersion) && !isClosed()) {
         if (remainingWaitMs != 0)
             wait(remainingWaitMs); // 等待一会再判断
         long elapsed = System.currentTimeMillis() - begin;
         if (elapsed >= maxWaitMs) // 超过了最大等待时间
             throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
         remainingWaitMs = maxWaitMs - elapsed;
     }
 }

 2.2 消息添加到累加器 RecordAccumulator

  2.2.1 缓冲池 BufferPool

  Kafka 使用缓冲池技术给消息分配堆字节缓存 HeapByteBuffer,缓冲池的空闲队列 free 存放了空闲的缓存队列,优先直接从中取出第一个进行分配缓存,如果缓冲池不够了,利用 ReentrantLock + Condition 构造等待队列,等待缓冲池足够分配。
  Kafka 在处理消息响应时,释放分配的内存,并把加入空闲队列 free。

// 缓冲池
public class BufferPool {
	// 可用总内存 buffer.memory
    private final long totalMemory;
	// 一批消息的大小 batch.size
    private final int poolableSize;
    private final ReentrantLock lock;
	// 空闲缓存队列
    private final Deque<ByteBuffer> free;
	// 等待队列
    private final Deque<Condition> waiters;
    // 可用未分配的内存总量是nonPooledAvailableMemory和free * poolableSize中字节缓冲区的总和。
    private long nonPooledAvailableMemory;
}
// 字节缓冲分配
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
	if (size > this.totalMemory)
		throw new IllegalArgumentException("消息大小超过总内存");

	ByteBuffer buffer = null;
	this.lock.lock();
	try {
		// 直接在空闲队列分配
		if (size == poolableSize && !this.free.isEmpty())
			return this.free.pollFirst();

		// 计算空闲队列总大小
		int freeListSize = this.free.size() * this.poolableSize;
		if (this.nonPooledAvailableMemory + freeListSize >= size) { // 可用的总内存(未分配的+空闲队列)>消息大小
			// we have enough unallocated or pooled memory to immediately
			// satisfy the request, but need to allocate the buffer
			freeUp(size);
			this.nonPooledAvailableMemory -= size; // 未分配内存总数-消息大小
		} else { // 内存不够分配
			int accumulated = 0;
			Condition moreMemory = this.lock.newCondition();
			try {
				long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
				this.waiters.addLast(moreMemory); // 加入等待队列
				// loop over and over until we have a buffer or have reserved
				// enough memory to allocate one
				while (accumulated < size) { //  轮询,直到足够分配内存
					long startWaitNs = time.nanoseconds();
					long timeNs;
					boolean waitingTimeElapsed;
					try { // 等待一段时间
						waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
					}

					remainingTimeToBlockNs -= timeNs;

					// 直接在空闲队列分配
					if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
						buffer = this.free.pollFirst();
						accumulated = size;
					} else { // 内存不够,accumulated累加计数
						freeUp(size - accumulated);
						int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
						this.nonPooledAvailableMemory -= got;
						accumulated += got;
					}
				}
				accumulated = 0; // 清空
			}
		}
	}

	if (buffer == null) // 没有在空闲队列分配到内存,需要在堆上分配内存
		return new HeapByteBuffer(size, size);
	else
		return buffer;
}
private void freeUp(int size) {
	while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
		this.nonPooledAvailableMemory += this.free.pollLast().capacity(); // 释放空闲队列的内存
}
// 处理生产者响应消息时,释放分配的内存
public void deallocate(ByteBuffer buffer, int size) {
	lock.lock();
	try {
		if (size == this.poolableSize && size == buffer.capacity()) {
			buffer.clear();
			this.free.add(buffer); // 加到空闲队列
		} else {
			this.nonPooledAvailableMemory += size; // 增加未分配内存数量
		}
		Condition moreMem = this.waiters.peekFirst();
		if (moreMem != null)
			moreMem.signal();
	} finally {
		lock.unlock();
	}
}

  2.2.2 消息缓存 CopyOnWriteMap

  累加器使用 CopyOnWriteMap 来缓存消息,key 是主题分区信息,value 是个双端队列,队列中的对象是压缩后的批量消息。

// 累加器缓存
ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();

  CopyOnWriteMap 是线程安全的,是由 Kafka 实现的写时复制 Map,内部定义了 volatile 的 Map,读时不用加锁,直接读取,写时需要加锁,然后拷贝一个 Map 副本进行实际的写入,写入完成后再把原来的 Map 指向修改后的 Map。
  双端队列 Deque 实际上就是 ArrayDeque,非线程安全的,需要手动同步。使用双端队列可以在消息发送失败时,把消息直接放回队列头部进行重试。

// 累加消息到缓存
public RecordAppendResult append(TopicPartition tp,
								 long timestamp,
								 byte[] key,
								 byte[] value,
								 Header[] headers,
								 Callback callback,
								 long maxTimeToBlock) throws InterruptedException {
	ByteBuffer buffer = null;
	try {
		Deque<ProducerBatch> dq = getOrCreateDeque(tp); // 检查 batches 是否有该分区的映射,如果没有,则创建一个
		synchronized (dq) { // 加锁后分配
			RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
			if (appendResult != null)
				return appendResult;
		}

		byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
		// 计算消息大小
		int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
		buffer = free.allocate(size, maxTimeToBlock); // 利用 BufferPool 分配字节缓存
		synchronized (dq) { // 加锁后分配
			RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
			// 构造出压缩后的批量消息对象 ProducerBatch
			MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
			ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
			FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

			dq.addLast(batch); // 加入双端队列

			return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
		}
	}
}
原文地址:https://www.cnblogs.com/bigshark/p/11183758.html