kafka-producer partitioner.class的使用

partitioner.class的说明

 

在API客户端中封装好的partition( )方法会为消息选择一个分区编号。为了保证消息负载均衡到每个分区,可以通过使用默认方式或者

手动配置这个参数的方式使消息发布到topic中。

1)如果默认不设置,就会通过计数器自增轮询的方式依次将消息分配到不同的分区上;

2)如果设置此值,就可以通过自定义设置,进行自主计算,此文档主要是代码。

JAVA代码配置了partitioner.class参数

public class ProducerPartitioner implements Partitioner {
	
	private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<String, AtomicInteger>();
	
	public ProducerPartitioner(){
		
	}
	@Override
	public void configure(Map<String, ?> arg0) {
		// TODO Auto-generated method stub
	}
	@Override
	public void close() {
		// TODO Auto-generated method stub
	}
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {
		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if(keyBytes == null) {
            int nextValue = this.nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if(availablePartitions.size() > 0) {
                int part = Utils.abs(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                return Utils.abs(nextValue) % numPartitions;
            }
        } else {
            return Utils.abs(Utils.murmur2(keyBytes)) % numPartitions;
        }
	}
	   
	private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if(null == counter) {
            counter = new AtomicInteger((new Random()).nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if(currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

}
原文地址:https://www.cnblogs.com/boanxin/p/9145273.html