(09)Kafka写入hadoop简单示例

  本篇在(08)java程序连接kafka示例的基础上做了一些修改

  1、导入hadoop需要的jar包

  在服务器上安装hadoop的时候,解压后就有需要的jar包,jar包的路径分别如下:

  /usr/local/hadoop-2.7.3/share/hadoop/common

  /usr/local/hadoop-2.7.3/share/hadoop/hdfs

  2、新建hadoo连接类

package demo;

import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSUtils {

    private static FileSystem fs;
    
    //初始化fs
    static{
        Configuration conf = new Configuration();
        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
        conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
        try {
            fs = FileSystem.get(new URI("hdfs://192.168.7.151:9000"), conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 将consumer从topic中获取的数据写入到HDFS
     * @param filename: HDFS上的文件
     * @param data:数据
     */
    public static void sendToHDFS(String filename,String data) throws Exception{
        OutputStream out = null;
        if(!fs.exists(new Path(filename))){
            //创建文件
            out = fs.create(new Path(filename));
        }else{
            //追加新的内容
            out = fs.append(new Path(filename));
        }
        //将数据写入到HDFS
        out.write(data.getBytes());
        out.close();
    }
}

  3、新建生产者类,与第08篇一样,未做改动

package demo;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class ProducerDemo extends Thread {
    
    //指定具体的topic
    private String topic;
    
    public ProducerDemo(String topic){
        this.topic = topic;
    }
    
    //每隔5秒发送一条消息
    public void run(){
        //创建一个producer的对象
        Producer producer = createProducer();
        //发送消息
        int i = 1;
        while(true){
            String data = "message " + i++;
            //使用produer发送消息
            producer.send(new KeyedMessage(this.topic, data));
            //打印
            System.out.println("发送数据:" + data);
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    //创建Producer的实例
    private Producer createProducer() {
        Properties prop = new Properties();
        //声明zk
        prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181");
        prop.put("serializer.class",StringEncoder.class.getName());
        //声明Broker的地址
        prop.put("metadata.broker.list","192.168.7.151:9092,192.168.7.151:9093");
        return new Producer(new ProducerConfig(prop));
    }
    
    public static void main(String[] args) {
        //启动线程发送消息
        new ProducerDemo("mydemo1").start();
    }
}

  4、新建消费者类,第43到47行是新增的,将消费信息写入hadoop

 1 package demo;
 2 
 3 import java.util.HashMap;
 4 import java.util.List;
 5 import java.util.Map;
 6 import java.util.Properties;
 7 
 8 
 9 import kafka.consumer.Consumer;
10 import kafka.consumer.ConsumerConfig;
11 import kafka.consumer.ConsumerIterator;
12 import kafka.consumer.KafkaStream;
13 import kafka.javaapi.consumer.ConsumerConnector;
14 
15 public class ConsumerDemo extends Thread {
16 
17     //指定具体的topic
18     private String topic;
19     
20     public ConsumerDemo(String topic){
21         this.topic = topic;
22     }
23     
24     public void run(){
25         //构造一个consumer的对象
26         ConsumerConnector consumer = createConsumer();
27         //构造一个Map对象,代表topic
28         //String: topic的名称  Integer: 从这个topic中获取多少条记录
29         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
30         //一次从这个topic中获取一条记录
31         topicCountMap.put(this.topic, 1);
32         //构造一个messageStream:输入流
33         //String: topic的名称 List: 获取的数据
34         Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
35         //获取每次接受到的具体的数据
36         KafkaStream<byte[], byte[]> stream = messageStreams.get(this.topic).get(0);
37         ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
38         while(iterator.hasNext()){
39             String message = new String(iterator.next().message());
40             System.out.println("接受数据:" + message);
41             
42             //将数据写入HDFS
43             try {
44                 HDFSUtils.sendToHDFS("/kafka/data.txt", message);
45             } catch (Exception e) {
46                 e.printStackTrace();
47             }
48             
49         }
50     }
51     
52     //创建具体的consumer
53     private ConsumerConnector createConsumer() {
54         Properties prop = new Properties();
55         //指明zk的地址
56         prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181");
57         //指明这个consumer的消费组
58         prop.put("group.id", "group1");
59         //时间设置的过小可能会连接超时。。。
60         prop.put("zookeeper.connection.timeout.ms", "60000");
61         return Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
62     }
63 
64     public static void main(String[] args) {
65         new ConsumerDemo("mydemo1").start();
66     }
67 
68 }

  到这里代码完成,首先程序启动前hadoop中没有数据

  启动生产者和消费者程序后,再次查看,已经产生数据

   执行hdfs dfs -cat /kafka/data.txt 查看数据,发现是消费者接受的数据

原文地址:https://www.cnblogs.com/javasl/p/12291080.html