kafka-partition分配的策略

partition是kafka中的重要设计概念,处于topic之下,消息都是存在partition中的,

生产的消息实际是发到partition中的,消费某个topic,实际也是从partition中拉取的消息

topic创建时,若不指定分区个数,则使用server.properties中配置的num.partitions值,也可以自己指定

比如我创建了一个10分区的topic:TEST,查看TEST结果如下:

./kafka-topics  --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 10 --topic TEST
./kafka-topics --describe --bootstrap-server localhost:9092 --topic TEST
Topic:TEST    PartitionCount:10    ReplicationFactor:1    Configs:min.insync.replicas=1,segment.bytes=1073741824,retention.ms=604800000,max.message.bytes=1000000,min.cleanable.dirty.ratio=0.5,unclean.leader.election.enable=false,retention.bytes=-1,delete.retention.ms=604800000
    Topic: TEST    Partition: 0    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 1    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 2    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 3    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 4    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 5    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 6    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 7    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 8    Leader: 44    Replicas: 44    Isr: 44
    Topic: TEST    Partition: 9    Leader: 44    Replicas: 44    Isr: 44

producer与partition

生产者在往topic发送数据时,ProducerRecord<K,V>有这样几个属性:

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

其中决定这条record发送到那个分区,主要由partition和key两个属性决定,partition的选取也有一个策略,官网描述如下:

The default partitioning strategy:

If a partition is specified in the record, use it     指定了分区,则消息投递到指定的分区

If no partition is specified but a key is present choose a partition based on a hash of the key   未指定分区,但指定了key,则基于hash(key)选择一个分区

If no partition or key is present choose a partition in a round-robin fashion   分区编号和key均未指定,则轮询选择,round-robin,老熟客了

未指定分区时,贴一下源码是怎么计算的:(org.apache.kafka.clients.producer.internals.DefaultPartitioner.partitiono())

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = this.nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

consumer与partition

 借一张网图说明:假设topic有4个分区(P0-P4),有两个组(Group A和Group B),A和B分别有2个(C1-C2)和4个消费者(C3-C6)

已知如下事实:

主题下有多个分区;

消费者以组的名义订阅主题

组内有一个或多个消费者实例

同一时刻,一条消息只能被组里的一个消费者消费

每个分区在某一时刻只会被组里的一个消费者消费(rebalance时这个实施仍然成立)

默认情况下:

如果分区数大于消费者实例个数,按照抽屉原理,必定有一个消费者同时负责多个分区(2个 or 以上)

如果分区数等于消费者实例个数,正好一个消费者消费一个分区

如果分区数小于消费者实例个数,必然有消费者空闲

官网上,通过配置partition.assignment.strategy来规定分区分配策略,默认是range,是一个class:org.apache.kafka.clients.consumer.RangeAssignor

查看源码如下图,可配置的Partition分配策略有三个,range,roundRobin,Sticky

  •  Range

以topic为单位处理分区分配(对每个topic独立分配),先对所有分区按照分区ID进行排序,然后对消费组中的所有消费者进行排序

下图是三种情况下的分配结果举例:分别是分区数大于且能整除,分区数大于但不能整除,分区数小于consumer数

 挖了下源码,截取核心的一段逻辑比较清晰:

public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
        Map<String, List<String>> consumersPerTopic = this.consumersPerTopic(subscriptions);
        Map<String, List<TopicPartition>> assignment = new HashMap();
        Iterator i$ = subscriptions.keySet().iterator();
        while(i$.hasNext()) {
            String memberId = (String)i$.next();
            assignment.put(memberId, new ArrayList());
        }
        i$ = consumersPerTopic.entrySet().iterator();
        while(true) {
            String topic;
            List consumersForTopic;
            Integer numPartitionsForTopic;
            do {
                if (!i$.hasNext()) {
                    return assignment;
                }

                Entry<String, List<String>> topicEntry = (Entry)i$.next();
                topic = (String)topicEntry.getKey();
                consumersForTopic = (List)topicEntry.getValue();
                numPartitionsForTopic = (Integer)partitionsPerTopic.get(topic);
            } while(numPartitionsForTopic == null);
//在此之前都是在处理分区和消费者的数据,算法部分从这里开始 //对消费者进行排序,可以看到是一个List<String>,所以这里是按照字典序排序 Collections.sort(consumersForTopic);
//求商和求余:分区数/消费者数 分区数%消费者数
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); int i = 0; //这个算法做的就是求出每个消费者最终分配到的分区,上图就是这么算出来的 for(int n = consumersForTopic.size(); i < n; ++i) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); ((List)assignment.get(consumersForTopic.get(i))).addAll(partitions.subList(start, start + length)); } } }

可以看到字典序排在前面的会分配到更多的分区(假设消费者少于分区),如果topic继续增多,排头的这个消费者会分到越来越多额外的分区

  • RoundRobin

将消费组内订阅的所有Topic的分区及组内所有消费者进行排序后尽量均衡的分配

如果组内的消费者消费的topic相同,可以得出,消费者之间分配到的分区数差值不会超过1,轮询的意思可自行理解

RoundRobin策略比range策略要稍微进步一点点,分配结果更均衡一些

  • StickyAssignor

有些版本的分配策略,只支持前两种,稍微新一点的版本,支持这个策略(至少是0.11以上)

前两种策略,如果消费者已经分配过一次之后,遇到重新分配的情况(比如rebalance),分区的调整会非常大,虽然重新分配的场景属于少数

那么需要一种策略,在保证尽量均衡的情况下,能尽量减少已经分配过的结果的改动,这样也能减少很多重新分配的开销

就是两个目标:尽量均衡+最少改动,当然,实现算法上也比前两者复杂很多

broker与partition

创建topic时指定分区数是可以自定义的,通常kafka集群有若干个broker,partition分配到broker也有一个设计

对于这个设计,需要这样一个思路:

1、每个topic下的partition尽量均匀分布到broker上;

2、每个broker上被分配到的partition个数尽量均匀;

3、分区也需要有副本保证高可用,副本需要尽量均匀的分布到broker上;

基于以上,网上讲的通过讲分区排序,broker排序,然后简单的运算取模显然是不行的,因为这样的话,排在前面的broker显然会更辛苦

分区分配的源码是scala版本,虽然跟java语法有区别,看代码逻辑推测一下语法,完全能看懂(scala.kafka.admin.AdminUtils.assignReplicasToBrokers())

还是先看作者的注释,做了下简短翻译:

* There are 3 goals of replica assignment:
*
* 1. Spread the replicas evenly among brokers.(分区副本要均匀分布在broker上)
* 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.(分区的每个副本要分配到不同的broker上)
* 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible(如果broker部署在不同机架上,副本还需要分布到不同机架上)
*
* To achieve this goal for replica assignment without considering racks, we:(不考虑机架的话,按照如下设计实现:)
* 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
* 2. Assign the remaining replicas of each partition with an increasing shift.

实现思路:

1、从broker-list中选定一个随机的位置作为开始,将每个partition的第一个replica,按round-robin(轮询)方式分配到这些broker上

2、每个分区剩下的replica的位置,以一个递增的shift(挪动)方式分配到其余的broker上

以下是根据源码逻辑做的简单的注释:

def assignReplicasToBrokers(brokerList: Seq[Int],        -----broker列表
                            nPartitions: Int,            -----待分配的partition数
                            replicationFactor: Int,      -----定义的每个分区的副本数
                            fixedStartIndex: Int = -1,   ----- 两个副本之间的增长间隔值
                            startPartitionId: Int = -1)  -----从topic的哪个分区开始分配
: Map[Int, Seq[Int]] = {                                 
  if (nPartitions <= 0)
    throw new AdminOperationException("number of partitions must be larger than 0")
  if (replicationFactor <= 0)
    throw new AdminOperationException("replication factor must be larger than 0")
  if (replicationFactor > brokerList.size)               -----这个地方说明分区副本数量不能超过broker个数
    throw new AdminOperationException("replication factor: " + replicationFactor +
      " larger than available brokers: " + brokerList.size)
  val ret = new mutable.HashMap[Int, List[Int]]()        -----分配结果,保存为一个HashMap,key为partition id,value为分配的brokers列表
  val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) ---随机选取一个startingBroker
  var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0  ----指定一个分区作为开始,否则从第一个分区开始(编号0)
  var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)  ----动态的增长间隔值
  for (i <- 0 until nPartitions) {                       ----- 遍历每个分区来做分配
    if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
      nextReplicaShift += 1                              ----- 分区编号能整除brokers.size()时,说明轮询到brokers末尾了,shift+1
    val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size  ----每个分区的第一个副本单独分配,一般作为副本的leader
    var replicaList = List(brokerList(firstReplicaIndex))
    for (j <- 0 until replicationFactor - 1)             ---- 给当前分区的每个副本做分配
      replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
    ret.put(currentPartitionId, replicaList.reverse)
    currentPartitionId = currentPartitionId + 1
  }
  ret.toMap
}
//每个分区除第一个副本外,其他副本通过该算法计算这个副本应该放在哪个broker上
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  (firstReplicaIndex + shift) % nBrokers
}

大体过程为,定义分配的broker(自定义否则随机生成),partition(自定义否则从0开始),shift(自定义否则随机生成)初始值

for循环nPartition次:

  partitionId是否轮询到brokers末尾,如果是,则shift+1;

  分配第一个副本到某个broker;

  for循环分配当前分区剩下的副本到broker上;

  分区id+1,继续分配;

原文地址:https://www.cnblogs.com/yb38156/p/14722534.html