kafkaConsumer(从topic 拿数据存入hdfs)

import kafka.consumer.ConsumerConfig;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;  
import kafka.serializer.StringDecoder;  
import kafka.utils.VerifiableProperties;  
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Properties;  
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.xinsight.Pool.ThreadPoolManager;
import com.xinsight.Thread.ConsumerThread;
public class KafkaConsumer {  
    private ConsumerConnector consumer = null;  
    private static FSDataOutputStream  hdfsOutStream;
    private static FSDataInputStream is;
    public static FileSystem fs ;
    public static Configuration conf;
    public static int num = 0;
    private static String filePath;
    public static String lock = new String("lock");
    public static void setFSDataOutputStream(String filename){
        Path path = new Path(filename);
        if(hdfsOutStream != null){
            try {
                hdfsOutStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        synchronized (lock) {
            try {
                if(fs.exists(path)){
                    is = fs.open(path);
                    FileStatus stat = fs.getFileStatus(path); 
                    byte[] buffer = buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))]; 
                    is.readFully(0, buffer);  
                    is.close(); 
                    fs.delete(path);
                    hdfsOutStream = fs.create(path);
                    hdfsOutStream.write(buffer);
                }else{
                    hdfsOutStream = fs.create(path);
                    
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static FSDataOutputStream getFSDataOutputStream(){
        return hdfsOutStream;
    }
    public static void init(){
        try {
            conf = new Configuration();
            String url = "hdfs://Master:9000/test" ;
            conf .set("dfs.client.block.write.replace-datanode-on-failure.policy" ,"NEVER" );
            conf .set("dfs.client.block.write.replace-datanode-on-failure.enable" ,"true" );
            fs = FileSystem.get( new URI( url), conf);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws InterruptedException {  
        
        if (args.length == 3) {
            init();
            String fileName = getFileName();
            filePath = args[1]+"/"+fileName ;
            KafkaConsumer.setFSDataOutputStream(filePath);
            /**********zookeeper的列表*********/
            String zkInfo = args[0];
            /************存入的hdfs文件夹**************/
            String uri = args[1];
            /************kafka的topic**********/
            String topic = args[2];
            KafkaConsumer consumer = new KafkaConsumer();
            consumer.setConsumer(zkInfo);
            consumer.consume(lock,topic);
        }
        
    }  
    /**
     * 加载配置
     * @param zkInfo
     */
    public void setConsumer(String zkInfo) {  
        Properties props = new Properties();  
        //zookeeper 配置  
        props.put("zookeeper.connect",zkInfo);  
        //group 代表一个消费组  
        props.put("group.id", "jd-group");  
        props.put("zookeeper.session.timeout.ms", "5000"); //client连接到ZK server的超时时间。
        props.put("zookeeper.sync.time.ms", "200"); //1个ZK follower能落后leader多久
        props.put("auto.commit.interval.ms", "1000");  //consumer向ZooKeeper发送offset的时间间隔。
        props.put("auto.offset.reset", "smallest");//读取旧数据  
        props.put("rebalance.max.retries", "5");//当一个新的consumer加入一个consumer group时,会有一个rebalance的操作,导致每一个consumer和partition的关系重新分配。如果这个重分配失败的话,会进行重试,此配置就代表最大的重试次数。
        props.put("rebalance.backoff.ms", "1200");//在rebalance重试时的backoff时间。
        //序列化类  
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
        ConsumerConfig config = new ConsumerConfig(props);  
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);  
    }  
    /**
     * 获得文件名
     * @return
     */
    public static String getFileName(){
        long time = System.currentTimeMillis();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH");
        String date = sdf.format(new Date(time));
        return date;
    }
    
    /**
     * 得到topic的消息
     * @param hdfsPath
     * @param topic
     * @throws InterruptedException 
     */
    public void consume(String lock,String topic) throws InterruptedException {  
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put(topic, new Integer(3));  
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());  
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());  
        Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);  
        List<KafkaStream<String, String>> streams = consumerMap.get(topic);
        System.out.println(streams.size());
        for(final KafkaStream stream : streams){
            ThreadPoolManager.dbShortSchedule(new ConsumerThread(stream,lock), 0);
        
        }
        System.out.println("finish");  
    }  
}  
package com.xinsight.Thread;
import java.io.IOException;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import com.xinsight.kafkaConsumer.KafkaConsumer;
public class ConsumerThread implements Runnable{
    private String lock;
    private KafkaStream m_stream;
    private int max_sync = 1000;
    private int current_sync = 0;
    public ConsumerThread(KafkaStream a_stream,String lock) {
        this.m_stream = a_stream;
        this.lock = lock;
    }
    @Override
    public void run() {
        ConsumerIterator<String, String> it = m_stream.iterator();
        while (it.hasNext()) {
            String message = it.next().message();
            try {
                synchronized (lock) {
                    WriteFile(KafkaConsumer.getFSDataOutputStream(),message);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        } 
    }
    /**
     * 写入hdfs的操作
     * @param hdfs
     * @param message
     * @throws IOException
     */
    public void WriteFile(FSDataOutputStream hdfsOutStream,String message) throws IOException {
        try{
            hdfsOutStream.write(message.getBytes());
            hdfsOutStream.write("
".getBytes());
            current_sync++;
            if(current_sync>=max_sync){
                hdfsOutStream.sync();
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}
原文地址:https://www.cnblogs.com/zqzdong/p/6438962.html