【Kafka】自定义分区策略

自定义分区策略


思路

Command+Option+shift+N 调出查询页面,找到producer包的Partitioner接口
在这里插入图片描述
Partitioner下有一个DefaultPartitioner实现类
在这里插入图片描述
这里就有之前提到kafka数据分区策略
在这里插入图片描述


自定义分区策略

创建一个MyPartitioner类,继承并重新定义上面的Partitioner类

package cn.itcast.kafka.demo1;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPartitioner implements Partitioner {
    /**
     * 此方法是确定分区规则
     * @param topic
     * @param key
     * @param keyBytes
     * @param value
     * @param valueBytes
     * @param cluster
     * @return 返回的int值为分区
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    	//return 3 则指定发送数据到3分区
        return 3;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

还需要在MyProducer中添加一行代码

props.put("partitioner.class","cn.itcast.kafka.demo1.MyPartitioner");

而且在MyProducer类中不需要指定分区号

producer.send(new ProducerRecord<String, String>("test" , "mykey" + i,"这是第" + i + "条message"));
原文地址:https://www.cnblogs.com/zzzsw0412/p/12772448.html