Kafka+Storm写入Hbase和HDFS

1.Storm整合Kafka

使用Kafka作为数据源,起到缓冲的作用

 1  // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
 2 String zks = KafkaProperties.Connect;
 3 BrokerHosts brokerHosts = new ZkHosts(zks);
 4 String topic = KafkaProperties.topic;
 5 String group = KafkaProperties.groupId;
 6 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, "/storm", group);
 7 spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
 8 spoutConfig.zkServers = Arrays.asList(new String[] {"192.168.211.1","192.168.211.2","192.168.211.3"});
 9 spoutConfig.zkPort = 2181;
10 spoutConfig.ignoreZkOffsets = true;
11 spoutConfig.startOffsetTime=-2L;
12 
13 KafkaSpout receiver = new KafkaSpout(spoutConfig);
14 topologyBuilder.setSpout("kafka-spout", receiver);

KafkaProperties:

/**
 * 配置一些Storm从kafka取数据时,一些关于数据源的配置信息
 * @author kongc
 *
 */
public interface KafkaProperties {
   final static String Connect = "192.168.211.1:2181,192.168.211.2:2181,192.168.211.3:2181";
   final static String groupId = "kafka";
   final static String topic = "test_topic";
}

2.Storm整合HDFS

我们希望按照日期,创建文件,将Storm计算后的数据写入HDFS

采取的策略是通过获取系统当前时间,然后格式化成所要命名的字符串作为path,然后判断这个路径是否存在,存在则追加写入,不存在则创建。

/***************将数据存入HDFS**********************/
Path path = new Path("hdfs://192.168.1.170:8020/user/hive/warehouse/test_oee/" + format + "oee.txt");
synchronized (path) {
   try {
      if(KafkaTopology.fileSystem.exists(path)!=true){
         System.out.println("*************create*************");
         KafkaTopology.FDoutputStream = KafkaTopology.fileSystem.create(path, true);
      }else{
         if(KafkaTopology.FDoutputStream ==null){
            System.out.println("**************append*************");
            KafkaTopology.FDoutputStream = KafkaTopology.fileSystem.append(path);
         }
      }
      String data = mesg.getEquipment_name()+","+mesg.getDown_time()+","+mesg.getQualified_count()+","+mesg.getQualified_count()+","+mesg.getAll_count()+","+mesg.getPlan_time()+","+mesg.getProduce_time()+"\n";
      KafkaTopology.FDoutputStream.write(data.getBytes());
      KafkaTopology.FDoutputStream.close();
      KafkaTopology.FDoutputStream = null;
   } catch (IOException e) {
      e.printStackTrace();
   }

}

Storm整合Hbase

Storm写入Hbase

 /****************存入Hbase*****************/
String[] value = {
      mesg.getEquipment_name(),
      mesg.getDown_time(),
      mesg.getQualified_count(),
      mesg.getAll_count(),
      mesg.getPlan_time(),
      mesg.getProduce_time()
};
//System.out.println("hbase==>:"+value.toString());
HbaseHelper.insertData(
      KafkaTopology.tableName, 
      mesg.getEquipment_name()+Math.random()*1000000000, 
      KafkaTopology.family,value
);
this.collector.ack(input);

在调试Storm的过程中遇到一些问题。

错误信息:

NIOServerCnxn - caught end of stream exception
ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x15cf25cbf2d000d, likely client has closed socket
Caused by: java.lang.NullPointerException
ERROR o.a.s.util - Halting process: ("Worker died")

错误原因:

追踪源码找到打印此语句的位置

/** Read the request payload (everything following the length prefix) */
    private void readPayload() throws IOException, InterruptedException {
        if (incomingBuffer.remaining() != 0) { // have we read length bytes?
                //尝试一次读进来
            int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from client sessionid 0x"
                        + Long.toHexString(sessionId)
                        + ", likely client has closed socket");
            }
        }
     //一次读完
        if (incomingBuffer.remaining() == 0) { // have we read length bytes?
                //server的packet统计
            packetReceived();
                //准备使用这个buffer了
            incomingBuffer.flip();
                //如果CoonectRequst还没来,那第一个packet肯定是他了
            if (!initialized) {
                readConnectRequest();
            } 
                //处理请他请求
            else {
                readRequest();
            }
                //清理现场,为下一个packet读做准备
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
        }
    }
原文地址:https://www.cnblogs.com/kongcong/p/7112029.html