hdfs文件写入kafka集群

1. 场景描述

因新增Kafka集群,需要将hdfs文件写入到新增的Kafka集群中,后来发现文件不多,就直接下载文件到本地,通过Main函数写入了,假如需要部署到服务器上执行,需将文件读取这块稍做修改。

2. 解决方案

代码是真实的代码,可以直接运行,只把Ip地址做了下隐藏而已。

2.1 真实代码

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.hadoop.conf.Configuration;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;

@SuppressWarnings("all")
public class HdfsToKafka_test {
    public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
    public static final int chartsLength = charts.length;

    private static Configuration getConf(String hdfsInfo) {
        Configuration conf = new Configuration();
        // 文件系统为必须设置的内容。其他配置参数可以自行设置,且优先级最高
        if (hdfsInfo == null || hdfsInfo == "") {
            hdfsInfo = "hdfs://nstest";
        }
        conf.set("fs.defaultFS", hdfsInfo);
        return conf;
    }

    private static void writeKafka(String lineStr, String kafkaInfo, String topic) {
        if (kafkaInfo == null || kafkaInfo == "") {
            kafkaInfo = "10.192.168.10:9092,10.192.168.11:9092,10.192.168.12:9092";
        }
        Properties props = new Properties();
        props.put("metadata.broker.list", kafkaInfo);
        /**
         * 0表示不等待结果返回<br/>
         * 1表示等待至少有一个服务器返回数据接收标识<br/>
         * -1表示必须接收到所有的服务器返回标识,及同步写入<br/>
         * */
        props.put("request.required.acks", "0");
        /**
         * 内部发送数据是异步还是同步
         * sync:同步, 默认
         * async:异步
         */
        props.put("producer.type", "async");
        /**
         * 设置序列化的类
         * 可选:kafka.serializer.StringEncoder
         * 默认:kafka.serializer.DefaultEncoder
         */
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /**
         * 设置分区类
         * 根据key进行数据分区
         * 默认是:kafka.producer.DefaultPartitioner ==> 安装key的hash进行分区
         * 可选:kafka.serializer.ByteArrayPartitioner ==> 转换为字节数组后进行hash分区
         */
        props.put("partitioner.class", "JavaKafkaProducerPartitioner");
        // 重试次数
        props.put("message.send.max.retries", "3");
        // 异步提交的时候(async),并发提交的记录数
        props.put("batch.num.messages", "200");
        // 设置缓冲区大小,默认10KB
        props.put("send.buffer.bytes", "102400");
        // 2. 构建Kafka Producer Configuration上下文
        ProducerConfig config = new ProducerConfig(props);
        // 3. 构建Producer对象
        final Producer<String, String> producer = new Producer<String, String>(config);
        // 发送数据
        KeyedMessage message = generateKeyedMessage(topic, lineStr);
        producer.send(message);
        System.out.println("发送数据:" + message);
    }

    /**
     * 产生一个消息
     *
     * @return
     */
    private static KeyedMessage<String, String> generateKeyedMessage(String topic, String linestr) {
        String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);
        return new KeyedMessage(topic, key, linestr);
    }

    public static String hdfstoKafkafromLocal(String hdfsfileAdress, String hdfsInfo, String kafkaInfo, String topic) {

        String message = "";
        try {
            InputStream is = new FileInputStream("C:/hdfs/Order.json");
            InputStreamReader isr = new InputStreamReader(is, "utf-8");
            BufferedReader br = new BufferedReader(isr);
            String line = "";
            while ((line = br.readLine()) != null) {
                writeKafka(line, kafkaInfo, topic);
            }
        } catch (Exception e) {
            message = e.getMessage();
        }
        return message;
    }

    public static void main(String[] args) {
        hdfstoKafkafromLocal(null, null, null, "Order");
    }
}

还有一个类,感觉没啥用,但是上面的类引用到了,也给传一下吧,保证代码可用。

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class JavaKafkaProducerPartitioner implements Partitioner {
    /**
     * 无参构造函数
     */
    public JavaKafkaProducerPartitioner() {
        this(new VerifiableProperties());
    }
    /**
     * 构造函数,必须给定
     *
     * @param properties 上下文
     */
    public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
        // nothings
    }
    public int partition(Object key, int numPartitions) {
        int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim());
        return num % numPartitions;
    }

}

2.2 代码说明

(1)main方式是入门类;

(2)hdfstoKafkafromLocal用于读取本地文件;

(3)writeKafka,kafka配置及写入;

(4)KeyedMessage,生成消息;

另外还有一点要说明,本机的Host文件要配置Kafka集群的域名解析,否则可能会连接失败。


原文地址:https://www.cnblogs.com/ruanjianlaowang/p/11182515.html