kafka学习总结003 --- 生产者分区策略

分区结构

如下图是官网上kafka三级结构图,三级结构为主题---分区---消息,并且每条消息只能保存在某一个分区内;

kafka虽然是一个队列但是不保证消息有序,但是对于分区来说消息是有序的

为什么分区

分区的作用就是提供生产消费数据负载分担的能力;不同的分区被分配在不同的节点,数据的生产消费是基于分区粒度进行的,

这样每个节点都能独立的执行各自分区的数据生产消费,而且我们可以按需增加新的节点提升系统的吞吐量。

分区策略

所谓的生产者分区策略就是决定生产消息时,如何写入到不同的分区中;kafka提供了默认的分区策略,当然我们也能自定义分区策略(通过指定生产者partitioner.class参数)

kafka提供了三种分区策略:轮询策略、随机策略、按消息键保序策略

1、轮询策略

这是默认的分区策略,能够保证消息最大限度的被平均分配到所有分区

2、随机策略(已经过时了)

也就是生产的消息被随机分配到不同的分区,实际的表现逊于轮询策略;实际上,老的kafka版本用的是随机策略,新的版本已经改成轮询策略了

3、按消息键保序策略

生产消息时,为每条消息定义消息键key,消息键是一个有着明确含义的业务字符串,可以是业务ID之类的;通过消息键,相同的消息键的消息能被保证写入相同的分区

注意kafka不同版本的差异:

kafka-2.4.0之前的版本提供了一个默认策略:org.apache.kafka.clients.producer.internals.DefaultPartitioner

这个分区策略的流程是:如果消息key为空,先随机选择一个分区,后续按照轮询策略分配分区

kafka-2.4.0及以后的版本做了如下的变更,提供了如下三个分区策略:

1、org.apache.kafka.clients.producer.internals.DefaultPartitioner

默认策略,做的变动是:如果消息键为空消息发送的分区先保持粘性(也就是先向同一个分区发送);如果当前batch已满或者linger.ms超时已经发送,那么新的消息会发给另外的分区(选择策略还是Round-Robin)

这样变动的原因个人理解是为了减少客户端和服务端的交互次数,消息按照batchSize发送

2、org.apache.kafka.clients.producer.RoundRobinPartitioner

轮询策略,没啥可说的

3、org.apache.kafka.clients.producer.UniformStickyPartitioner

默认策略,消息key为空场景

自定义分区策略

kafka java api提供了一个接口,用于自定义分区策略:org.apache.kafka.clients.producer.Partitioner

public interface Partitioner extends Configurable, Closeable {
    int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);

    void close();

    default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    }
}

RoundRobinPartitioner(轮询策略)源码:

public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public RoundRobinPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    public void close() {
    }
}
原文地址:https://www.cnblogs.com/sniffs/p/13019316.html