kafka 自定义分区

1:POM文件

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

2:自定义分区

package com.kpwong.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class Mypartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//        cluster.availablePartitionsForTopic(topic).size();
        return 1;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

3:生产者使用自定义分区

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kpwong.partitioner.Mypartitioner");
package com.kpwong.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class PartitionProducer {

    public static void main(String[] args) {
        //Create kafka 生产者配置信息
        Properties properties = new Properties();
        //kafka 集群, broker list
        properties.put("bootstrap.servers", "hadoop202:9092");
        properties.put("acks", "all");
        //重试次数
        properties.put("retries", 1);
        //批次大小
        properties.put("batch.size", 16384);
        //等待时间
        properties.put("linger.ms", 1);
        //RecordAccumulator 缓冲区大小 32M
        properties.put("buffer.memory", 33554432);
        // key value 的序列化类
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //用户自定义分区
//        ProducerConfig.PARTITIONER_CLASS_CONFIG
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kpwong.partitioner.Mypartitioner");

        //创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        //发送数据
        for(int i = 11 ;i <= 20;i++)
        {
            producer.send(new ProducerRecord<String, String>("two", "kpwong--" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if( e == null)
                    {
                        System.out.println(recordMetadata.partition() + "-----"+ recordMetadata.offset());
                    }
                    else
                    {
                        e.printStackTrace();
                    }
                }
            });
        }

        //关闭连接
        producer.close();
    }
}

运行看结果:

原文地址:https://www.cnblogs.com/kpwong/p/14052950.html