Storm流处理项目案例

1.项目框架

  

======================程序需要一步一步的调试=====================

一:第一步,KafkaSpout与驱动类

1.此时启动的服务有

  

2.主驱动类

 1 package com.jun.it2;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.generated.AlreadyAliveException;
 7 import backtype.storm.generated.InvalidTopologyException;
 8 import backtype.storm.generated.StormTopology;
 9 import backtype.storm.spout.SchemeAsMultiScheme;
10 import backtype.storm.topology.IRichSpout;
11 import backtype.storm.topology.TopologyBuilder;
12 import storm.kafka.*;
13 
14 import java.util.UUID;
15 
16 public class WebLogStatictis {
17     /**
18      * 主函数
19      * @param args
20      */
21     public static void main(String[] args) {
22         WebLogStatictis webLogStatictis=new WebLogStatictis();
23         StormTopology stormTopology=webLogStatictis.createTopology();
24         Config config=new Config();
25         //集群或者本地
26         //conf.setNumAckers(4);
27         if(args == null || args.length == 0){
28             // 本地执行
29             LocalCluster localCluster = new LocalCluster();
30             localCluster.submitTopology("webloganalyse", config , stormTopology);
31         }else{
32             // 提交到集群上执行
33             config.setNumWorkers(4); // 指定使用多少个进程来执行该Topology
34             try {
35                 StormSubmitter.submitTopology(args[0],config, stormTopology);
36             } catch (AlreadyAliveException e) {
37                 e.printStackTrace();
38             } catch (InvalidTopologyException e) {
39                 e.printStackTrace();
40             }
41         }
42 
43     }
44     /**
45      * 构造一个kafkaspout
46      * @return
47      */
48     private IRichSpout generateSpout(){
49         BrokerHosts hosts = new ZkHosts("linux-hadoop01.ibeifeng.com:2181");
50         String topic = "nginxlog";
51         String zkRoot = "/" + topic;
52         String id = UUID.randomUUID().toString();
53         SpoutConfig spoutConf = new SpoutConfig(hosts,topic,zkRoot,id);
54         spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); // 按字符串解析
55         spoutConf.forceFromStart = true;
56         KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);
57         return kafkaSpout;
58     }
59 
60     public StormTopology createTopology() {
61         TopologyBuilder topologyBuilder=new TopologyBuilder();
62         //指定Spout
63         topologyBuilder.setSpout(WebLogConstants.KAFKA_SPOUT_ID,generateSpout());
64         //
65         topologyBuilder.setBolt(WebLogConstants.WEB_LOG_PARSER_BOLT,new WebLogParserBolt()).shuffleGrouping(WebLogConstants.KAFKA_SPOUT_ID);
66 
67         return topologyBuilder.createTopology();
68     }
69 
70 }

3.WebLogParserBolt

  这个主要的是打印Kafka的Spout发送的数据是否正确。

 1 package com.jun.it2;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.IRichBolt;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.tuple.Tuple;
 8 
 9 import java.util.Map;
10 
11 public class WebLogParserBolt implements IRichBolt {
12     @Override
13     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
14 
15     }
16 
17     @Override
18     public void execute(Tuple tuple) {
19         String webLog=tuple.getStringByField("str");
20         System.out.println(webLog);
21     }
22 
23     @Override
24     public void cleanup() {
25 
26     }
27 
28     @Override
29     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
30 
31     }
32 
33     @Override
34     public Map<String, Object> getComponentConfiguration() {
35         return null;
36     }
37 }

4.运行Main

  先消费在Topic中的数据。

  

5.运行kafka的生产者

   bin/kafka-console-producer.sh --topic nginxlog --broker-list linux-hadoop01.ibeifeng.com:9092

  

6.拷贝数据到kafka生产者控制台

  

7.Main下面控制台的程序

  

二:第二步,解析Log

1.WebLogParserBolt

  如果要是验证,就删除两个部分,打开一个注释:

    删掉分流

    删掉发射

    打开打印的注释。

2.效果

  这个只要启动Main函数就可以验证。

  

3.WebLogParserBolt

 1 package com.jun.it2;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.IRichBolt;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.tuple.Fields;
 8 import backtype.storm.tuple.Tuple;
 9 import backtype.storm.tuple.Values;
10 
11 import java.text.DateFormat;
12 import java.text.SimpleDateFormat;
13 import java.util.Date;
14 import java.util.Map;
15 import java.util.regex.Matcher;
16 import java.util.regex.Pattern;
17 
18 import static com.jun.it2.WebLogConstants.*;
19 
20 public class WebLogParserBolt implements IRichBolt {
21     private Pattern pattern;
22 
23     private OutputCollector  outputCollector;
24     @Override
25     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
26         pattern = Pattern.compile("([^ ]*) [^ ]* [^ ]* \[([\d+]*)\] \"[^ ]* ([^ ]*) [^ ]*\" \d{3} \d+ \"([^"]*)\" \"([^"]*)\" \"[^ ]*\"");
27         this.outputCollector = outputCollector;
28     }
29 
30     @Override
31     public void execute(Tuple tuple) {
32         String webLog=tuple.getStringByField("str");
33         if(webLog!= null || !"".equals(webLog)){
34 
35             Matcher matcher = pattern.matcher(webLog);
36             if(matcher.find()){
37                 //
38                 String ip = matcher.group(1);
39                 String serverTimeStr = matcher.group(2);
40 
41                 // 处理时间
42                 long timestamp = Long.parseLong(serverTimeStr);
43                 Date date = new Date();
44                 date.setTime(timestamp);
45 
46                 DateFormat df = new SimpleDateFormat("yyyyMMddHHmm");
47                 String dateStr = df.format(date);
48                 String day = dateStr.substring(0,8);
49                 String hour = dateStr.substring(0,10);
50                 String minute = dateStr ;
51 
52                 String requestUrl = matcher.group(3);
53                 String httpRefer = matcher.group(4);
54                 String userAgent = matcher.group(5);
55 
56                 //可以验证是否匹配正确
57 //                System.err.println(webLog);
58 //                System.err.println(
59 //                        "ip=" + ip
60 //                        + ", serverTimeStr=" + serverTimeStr
61 //                        +", requestUrl=" + requestUrl
62 //                        +", httpRefer=" + httpRefer
63 //                        +", userAgent=" + userAgent
64 //                );
65 
66                 //分流
67                 this.outputCollector.emit(WebLogConstants.IP_COUNT_STREAM, tuple,new Values(day, hour, minute, ip));
68                 this.outputCollector.emit(WebLogConstants.URL_PARSER_STREAM, tuple,new Values(day, hour, minute, requestUrl));
69                 this.outputCollector.emit(WebLogConstants.HTTPREFER_PARSER_STREAM, tuple,new Values(day, hour, minute, httpRefer));
70                 this.outputCollector.emit(WebLogConstants.USERAGENT_PARSER_STREAM, tuple,new Values(day, hour, minute, userAgent));
71             }
72         }
73         this.outputCollector.ack(tuple);
74 
75     }
76 
77     @Override
78     public void cleanup() {
79 
80     }
81 
82     @Override
83     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
84         outputFieldsDeclarer.declareStream(WebLogConstants.IP_COUNT_STREAM,new Fields(DAY, HOUR, MINUTE, IP));
85         outputFieldsDeclarer.declareStream(WebLogConstants.URL_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, REQUEST_URL));
86         outputFieldsDeclarer.declareStream(WebLogConstants.HTTPREFER_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, HTTP_REFER));
87         outputFieldsDeclarer.declareStream(WebLogConstants.USERAGENT_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, USERAGENT));
88     }
89 
90     @Override
91     public Map<String, Object> getComponentConfiguration() {
92         return null;
93     }
94 }

三:第三步,通用计数器

1.CountKpiBolt

 1 package com.jun.it2;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.IRichBolt;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.tuple.Fields;
 8 import backtype.storm.tuple.Tuple;
 9 import backtype.storm.tuple.Values;
10 
11 import java.util.HashMap;
12 import java.util.Iterator;
13 import java.util.Map;
14 
15 public class CountKpiBolt implements IRichBolt {
16 
17     private String kpiType;
18 
19     private Map<String,Integer> kpiCounts;
20 
21     private String currentDay = "";
22 
23     private OutputCollector outputCollector;
24 
25     public CountKpiBolt(String kpiType){
26         this.kpiType = kpiType;
27     }
28 
29     @Override
30     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
31         this.kpiCounts = new HashMap<>();
32         this.outputCollector = outputCollector;
33     }
34 
35     @Override
36     public void execute(Tuple tuple) {
37         String day = tuple.getStringByField("day");
38         String hour = tuple.getStringByField("hour");
39         String minute = tuple.getStringByField("minute");
40         String kpi = tuple.getString(3);
41         //日期与KPI组合
42         String kpiByDay = day + "_" + kpi;
43         String kpiByHour = hour +"_" + kpi;
44         String kpiByMinute = minute + "_" + kpi;
45         //将计数信息存放到Map中
46         int kpiCountByDay = 0;
47         int kpiCountByHour = 0;
48         int kpiCountByMinute = 0;
49         if(kpiCounts.containsKey(kpiByDay)){
50             kpiCountByDay = kpiCounts.get(kpiByDay);
51         }
52         if(kpiCounts.containsKey(kpiByHour)){
53             kpiCountByHour = kpiCounts.get(kpiByHour);
54         }
55         if(kpiCounts.containsKey(kpiByMinute)){
56             kpiCountByMinute = kpiCounts.get(kpiByMinute);
57         }
58         kpiCountByDay ++;
59         kpiCountByHour ++;
60         kpiCountByMinute ++;
61         kpiCounts.put(kpiByDay, kpiCountByDay);
62         kpiCounts.put(kpiByHour, kpiCountByHour);
63         kpiCounts.put(kpiByMinute,kpiCountByMinute);
64         //隔天清空内存
65         if(!currentDay.equals(day)){
66             // 说明隔天了
67             Iterator<Map.Entry<String,Integer>> iter = kpiCounts.entrySet().iterator();
68             while(iter.hasNext()){
69                 Map.Entry<String,Integer> entry = iter.next();
70                 if(entry.getKey().startsWith(currentDay)){
71                     iter.remove();
72                 }
73             }
74         }
75         currentDay = day;
76         //发射
77         //发射两个字段
78         this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByDay, kpiCountByDay));
79         this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByHour, kpiCountByHour));
80         this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByMinute, kpiCountByMinute));
81         this.outputCollector.ack(tuple);
82 
83     }
84 
85     @Override
86     public void cleanup() {
87 
88     }
89 
90     @Override
91     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
92         outputFieldsDeclarer.declare(new Fields(WebLogConstants.SERVERTIME_KPI, WebLogConstants.KPI_COUNTS));
93     }
94 
95     @Override
96     public Map<String, Object> getComponentConfiguration() {
97         return null;
98     }
99 }

2.saveBolt.java

  主要是打印功能。

 1 package com.jun.it2;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.IRichBolt;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.tuple.Tuple;
 8 
 9 import java.util.Map;
10 
11 public class SaveBolt implements IRichBolt {
12 
13     @Override
14     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
15 
16     }
17 
18     @Override
19     public void execute(Tuple tuple) {
20         String serverTimeAndKpi = tuple.getStringByField(WebLogConstants.SERVERTIME_KPI);
21         Integer kpiCounts = tuple.getIntegerByField(WebLogConstants.KPI_COUNTS);
22         System.err.println("serverTimeAndKpi=" + serverTimeAndKpi + ", kpiCounts=" + kpiCounts);
23     }
24 
25     @Override
26     public void cleanup() {
27 
28     }
29 
30     @Override
31     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
32 
33     }
34 
35     @Override
36     public Map<String, Object> getComponentConfiguration() {
37         return null;
38     }
39 }

3.效果

  

四:保存到HBase中

1.saveBolt.java

 1 package com.jun.it2;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.IRichBolt;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.tuple.Tuple;
 8 import org.apache.hadoop.conf.Configuration;
 9 import org.apache.hadoop.hbase.HBaseConfiguration;
10 import org.apache.hadoop.hbase.client.HTable;
11 import org.apache.hadoop.hbase.client.Put;
12 import org.apache.hadoop.hbase.util.Bytes;
13 
14 import java.io.IOException;
15 import java.util.Map;
16 
17 import static com.jun.it2.WebLogConstants.HBASE_TABLENAME;
18 
19 public class SaveBolt implements IRichBolt {
20     private HTable table;
21 
22     private OutputCollector outputCollector;
23     @Override
24     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
25         Configuration configuration = HBaseConfiguration.create();
26         try {
27             table = new HTable(configuration,HBASE_TABLENAME);
28         } catch (IOException e) {
29             e.printStackTrace();
30             throw new RuntimeException(e);
31         }
32 
33         this.outputCollector = outputCollector;
34     }
35 
36     @Override
37     public void execute(Tuple tuple) {
38         String serverTimeAndKpi = tuple.getStringByField(WebLogConstants.SERVERTIME_KPI);
39         Integer kpiCounts = tuple.getIntegerByField(WebLogConstants.KPI_COUNTS);
40 //        System.err.println("serverTimeAndKpi=" + serverTimeAndKpi + ", kpiCounts=" + kpiCounts);
41         if(serverTimeAndKpi!= null && kpiCounts != null){
42 
43             Put put = new Put(Bytes.toBytes(serverTimeAndKpi));
44             String columnQuelifier = serverTimeAndKpi.split("_")[0];
45             put.add(Bytes.toBytes(WebLogConstants.COLUMN_FAMILY),
46                     Bytes.toBytes(columnQuelifier),Bytes.toBytes(""+kpiCounts));
47 
48             try {
49                 table.put(put);
50             } catch (IOException e) {
51                 throw new RuntimeException(e);
52             }
53         }
54         this.outputCollector.ack(tuple);
55     }
56 
57     @Override
58     public void cleanup() {
59         if(table!= null){
60             try {
61                 table.close();
62             } catch (IOException e) {
63                 e.printStackTrace();
64             }
65         }
66     }
67 
68     @Override
69     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
70 
71     }
72 
73     @Override
74     public Map<String, Object> getComponentConfiguration() {
75         return null;
76     }
77 }

2.当前服务

  

3.进入Hbase建表

  

4.运行程序

  出现报错信息

 1 ERROR org.apache.hadoop.util.Shell - Failed to locate the winutils binary in the hadoop binary path
 2 java.io.IOException: Could not locate executable nullinwinutils.exe in the Hadoop binaries.
 3     at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
 4     at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
 5     at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
 6     at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
 7     at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
 8     at org.apache.hadoop.security.Groups.<init>(Groups.java:86) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
 9     at org.apache.hadoop.security.Groups.<init>(Groups.java:66) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
10     at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
11     at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:269) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
12     at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:246) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
13     at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:775) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
14     at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:760) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
15     at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:633) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
16     at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:260) [hbase-common-0.98.6-cdh5.3.6.jar:na]
17     at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:256) [hbase-common-0.98.6-cdh5.3.6.jar:na]
18     at org.apache.hadoop.hbase.security.User.getCurrent(User.java:160) [hbase-common-0.98.6-cdh5.3.6.jar:na]
19     at org.apache.hadoop.hbase.security.UserProvider.getCurrent(UserProvider.java:89) [hbase-common-0.98.6-cdh5.3.6.jar:na]
20     at org.apache.hadoop.hbase.client.HConnectionKey.<init>(HConnectionKey.java:70) [hbase-client-0.98.6-cdh5.3.6.jar:na]
21     at org.apache.hadoop.hbase.client.HConnectionManager.getConnection(HConnectionManager.java:267) [hbase-client-0.98.6-cdh5.3.6.jar:na]
22     at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:199) [hbase-client-0.98.6-cdh5.3.6.jar:na]
23     at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:161) [hbase-client-0.98.6-cdh5.3.6.jar:na]
24     at com.jun.it2.SaveBolt.prepare(SaveBolt.java:27) [classes/:na]
25     at backtype.storm.daemon.executor$fn__3439$fn__3451.invoke(executor.clj:699) [storm-core-0.9.6.jar:0.9.6]
26     at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) [storm-core-0.9.6.jar:0.9.6]
27     at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
28     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]

5.网上的解决方

  1.下载winutils的windows版本

    GitHub上,有人提供了winutils的windows的版本,项目地址是:https://github.com/srccodes/hadoop-common-2.2.0-bin,直接下载此项目的zip包,下载后是文件名是hadoop-common-2.2.0-bin-master.zip,随便解压到一个目录

  2.配置环境变量

    增加用户变量HADOOP_HOME,值是下载的zip包解压的目录,然后在系统变量path里增加$HADOOP_HOMEin 即可。  

    再次运行程序,正常执行。

6.截图

  

7.添加配置文件

  这个是必须的,在window下面。

  

8.最终执行效果

  

 五:PS---程序

1.主驱动类

 1 package com.jun.it2;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.generated.AlreadyAliveException;
 7 import backtype.storm.generated.InvalidTopologyException;
 8 import backtype.storm.generated.StormTopology;
 9 import backtype.storm.spout.SchemeAsMultiScheme;
10 import backtype.storm.topology.IRichSpout;
11 import backtype.storm.topology.TopologyBuilder;
12 import backtype.storm.tuple.Fields;
13 import org.apache.hadoop.fs.Path;
14 import storm.kafka.*;
15 
16 import java.io.File;
17 import java.io.IOException;
18 import java.util.UUID;
19 
20 public class WebLogStatictis {
21 
22     /**
23      * 主函数
24      * @param args
25      */
26     public static void main(String[] args) throws IOException {
27         WebLogStatictis webLogStatictis=new WebLogStatictis();
28         StormTopology stormTopology=webLogStatictis.createTopology();
29         Config config=new Config();
30         //集群或者本地
31         //conf.setNumAckers(4);
32         if(args == null || args.length == 0){
33             // 本地执行
34             LocalCluster localCluster = new LocalCluster();
35             localCluster.submitTopology("webloganalyse2", config , stormTopology);
36         }else{
37             // 提交到集群上执行
38             config.setNumWorkers(4); // 指定使用多少个进程来执行该Topology
39             try {
40                 StormSubmitter.submitTopology(args[0],config, stormTopology);
41             } catch (AlreadyAliveException e) {
42                 e.printStackTrace();
43             } catch (InvalidTopologyException e) {
44                 e.printStackTrace();
45             }
46         }
47 
48     }
49     /**
50      * 构造一个kafkaspout
51      * @return
52      */
53     private IRichSpout generateSpout(){
54         BrokerHosts hosts = new ZkHosts("linux-hadoop01.ibeifeng.com:2181");
55         String topic = "nginxlog";
56         String zkRoot = "/" + topic;
57         String id = UUID.randomUUID().toString();
58         SpoutConfig spoutConf = new SpoutConfig(hosts,topic,zkRoot,id);
59         spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); // 按字符串解析
60         spoutConf.forceFromStart = true;
61         KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);
62         return kafkaSpout;
63     }
64 
65     public StormTopology createTopology() {
66         TopologyBuilder topologyBuilder=new TopologyBuilder();
67         //指定Spout
68         topologyBuilder.setSpout(WebLogConstants.KAFKA_SPOUT_ID,generateSpout());
69         //指定WebLogParserBolt
70         topologyBuilder.setBolt(WebLogConstants.WEB_LOG_PARSER_BOLT,new WebLogParserBolt()).shuffleGrouping(WebLogConstants.KAFKA_SPOUT_ID);
71         //指定CountKpiBolt:第一个参数是组件,第二个参数是流ID,第三个参数是分组字段
72         topologyBuilder.setBolt(WebLogConstants.COUNT_IP_BOLT, new CountKpiBolt(WebLogConstants.IP_KPI))
73                 .fieldsGrouping(WebLogConstants.WEB_LOG_PARSER_BOLT, WebLogConstants.IP_COUNT_STREAM, new Fields(WebLogConstants.IP));
74         //指定SaveBolt:汇总
75         topologyBuilder.setBolt(WebLogConstants.SAVE_BOLT ,new SaveBolt(),3)
76                 .shuffleGrouping(WebLogConstants.COUNT_IP_BOLT)
77         ;
78         return topologyBuilder.createTopology();
79     }
80 
81 }

2.常量类

 1 package com.jun.it2;
 2 
 3 public class WebLogConstants {
 4     //Spout与Bolt的ID
 5     public static String KAFKA_SPOUT_ID="kafkaSpoutId";
 6     public static final String WEB_LOG_PARSER_BOLT = "webLogParserBolt";
 7     public static final String COUNT_IP_BOLT = "countIpBolt";
 8     public static final String COUNT_BROWSER_BOLT = "countBrowserBolt";
 9     public static final String COUNT_OS_BOLT = "countOsBolt";
10     public static final String USER_AGENT_PARSER_BOLT = "userAgentParserBolt";
11     public static final String SAVE_BOLT = "saveBolt";
12 
13     //流ID
14     public  static final String IP_COUNT_STREAM = "ipCountStream";
15     public  static final String URL_PARSER_STREAM = "urlParserStream";
16     public  static final String HTTPREFER_PARSER_STREAM = "httpReferParserStream";
17     public  static final String USERAGENT_PARSER_STREAM = "userAgentParserStream";
18     public  static final String BROWSER_COUNT_STREAM = "browserCountStream";
19     public  static final String OS_COUNT_STREAM = "osCountStream";
20 
21 
22     //tuple key名称
23     public static final String DAY = "day";
24     public static final String HOUR = "hour";
25     public static final String MINUTE = "minute";
26     public static final String IP = "ip";
27     public static final String REQUEST_URL = "requestUrl";
28     public static final String HTTP_REFER = "httpRefer";
29     public static final String USERAGENT = "userAgent";
30     public static final String BROWSER = "browser";
31     public static final String OS = "os";
32     public static final String SERVERTIME_KPI = "serverTimeAndKpi";
33     public static final String KPI_COUNTS = "kpiCounts";
34 
35 
36     //kpi类型
37     public static final String IP_KPI = "I";
38     public static final String URL_KPI = "U";
39     public static final String BROWSER_KPI = "B";
40     public static final String OS_KPI = "O";
41 
42 
43     //Hbase
44     public static final String HBASE_TABLENAME = "weblogstatictis";
45     public static final String COLUMN_FAMILY = "info";
46 }

3.解析类

 1 package com.jun.it2;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.IRichBolt;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.tuple.Fields;
 8 import backtype.storm.tuple.Tuple;
 9 import backtype.storm.tuple.Values;
10 
11 import java.text.DateFormat;
12 import java.text.SimpleDateFormat;
13 import java.util.Date;
14 import java.util.Map;
15 import java.util.regex.Matcher;
16 import java.util.regex.Pattern;
17 
18 import static com.jun.it2.WebLogConstants.*;
19 
20 public class WebLogParserBolt implements IRichBolt {
21     private Pattern pattern;
22 
23     private OutputCollector  outputCollector;
24     @Override
25     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
26         pattern = Pattern.compile("([^ ]*) [^ ]* [^ ]* \[([\d+]*)\] \"[^ ]* ([^ ]*) [^ ]*\" \d{3} \d+ \"([^"]*)\" \"([^"]*)\" \"[^ ]*\"");
27         this.outputCollector = outputCollector;
28     }
29 
30     @Override
31     public void execute(Tuple tuple) {
32         String webLog=tuple.getStringByField("str");
33         if(webLog!= null || !"".equals(webLog)){
34 
35             Matcher matcher = pattern.matcher(webLog);
36             if(matcher.find()){
37                 //
38                 String ip = matcher.group(1);
39                 String serverTimeStr = matcher.group(2);
40 
41                 // 处理时间
42                 long timestamp = Long.parseLong(serverTimeStr);
43                 Date date = new Date();
44                 date.setTime(timestamp);
45 
46                 DateFormat df = new SimpleDateFormat("yyyyMMddHHmm");
47                 String dateStr = df.format(date);
48                 String day = dateStr.substring(0,8);
49                 String hour = dateStr.substring(0,10);
50                 String minute = dateStr ;
51 
52                 String requestUrl = matcher.group(3);
53                 String httpRefer = matcher.group(4);
54                 String userAgent = matcher.group(5);
55 
56                 //可以验证是否匹配正确
57 //                System.err.println(webLog);
58 //                System.err.println(
59 //                        "ip=" + ip
60 //                        + ", serverTimeStr=" + serverTimeStr
61 //                        +", requestUrl=" + requestUrl
62 //                        +", httpRefer=" + httpRefer
63 //                        +", userAgent=" + userAgent
64 //                );
65 
66                 //分流
67                 this.outputCollector.emit(WebLogConstants.IP_COUNT_STREAM, tuple,new Values(day, hour, minute, ip));
68                 this.outputCollector.emit(WebLogConstants.URL_PARSER_STREAM, tuple,new Values(day, hour, minute, requestUrl));
69                 this.outputCollector.emit(WebLogConstants.HTTPREFER_PARSER_STREAM, tuple,new Values(day, hour, minute, httpRefer));
70                 this.outputCollector.emit(WebLogConstants.USERAGENT_PARSER_STREAM, tuple,new Values(day, hour, minute, userAgent));
71             }
72         }
73         this.outputCollector.ack(tuple);
74 
75     }
76 
77     @Override
78     public void cleanup() {
79 
80     }
81 
82     @Override
83     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
84         outputFieldsDeclarer.declareStream(WebLogConstants.IP_COUNT_STREAM,new Fields(DAY, HOUR, MINUTE, IP));
85         outputFieldsDeclarer.declareStream(WebLogConstants.URL_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, REQUEST_URL));
86         outputFieldsDeclarer.declareStream(WebLogConstants.HTTPREFER_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, HTTP_REFER));
87         outputFieldsDeclarer.declareStream(WebLogConstants.USERAGENT_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, USERAGENT));
88     }
89 
90     @Override
91     public Map<String, Object> getComponentConfiguration() {
92         return null;
93     }
94 }

4.计算类

 1 package com.jun.it2;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.IRichBolt;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.tuple.Fields;
 8 import backtype.storm.tuple.Tuple;
 9 import backtype.storm.tuple.Values;
10 
11 import java.util.HashMap;
12 import java.util.Iterator;
13 import java.util.Map;
14 
15 public class CountKpiBolt implements IRichBolt {
16 
17     private String kpiType;
18 
19     private Map<String,Integer> kpiCounts;
20 
21     private String currentDay = "";
22 
23     private OutputCollector outputCollector;
24 
25     public CountKpiBolt(String kpiType){
26         this.kpiType = kpiType;
27     }
28 
29     @Override
30     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
31         this.kpiCounts = new HashMap<>();
32         this.outputCollector = outputCollector;
33     }
34 
35     @Override
36     public void execute(Tuple tuple) {
37         String day = tuple.getStringByField("day");
38         String hour = tuple.getStringByField("hour");
39         String minute = tuple.getStringByField("minute");
40         String kpi = tuple.getString(3);
41         //日期与KPI组合
42         String kpiByDay = day + "_" + kpi;
43         String kpiByHour = hour +"_" + kpi;
44         String kpiByMinute = minute + "_" + kpi;
45         //将计数信息存放到Map中
46         int kpiCountByDay = 0;
47         int kpiCountByHour = 0;
48         int kpiCountByMinute = 0;
49         if(kpiCounts.containsKey(kpiByDay)){
50             kpiCountByDay = kpiCounts.get(kpiByDay);
51         }
52         if(kpiCounts.containsKey(kpiByHour)){
53             kpiCountByHour = kpiCounts.get(kpiByHour);
54         }
55         if(kpiCounts.containsKey(kpiByMinute)){
56             kpiCountByMinute = kpiCounts.get(kpiByMinute);
57         }
58         kpiCountByDay ++;
59         kpiCountByHour ++;
60         kpiCountByMinute ++;
61         kpiCounts.put(kpiByDay, kpiCountByDay);
62         kpiCounts.put(kpiByHour, kpiCountByHour);
63         kpiCounts.put(kpiByMinute,kpiCountByMinute);
64         //隔天清空内存
65         if(!currentDay.equals(day)){
66             // 说明隔天了
67             Iterator<Map.Entry<String,Integer>> iter = kpiCounts.entrySet().iterator();
68             while(iter.hasNext()){
69                 Map.Entry<String,Integer> entry = iter.next();
70                 if(entry.getKey().startsWith(currentDay)){
71                     iter.remove();
72                 }
73             }
74         }
75         currentDay = day;
76         //发射
77         //发射两个字段
78         this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByDay, kpiCountByDay));
79         this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByHour, kpiCountByHour));
80         this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByMinute, kpiCountByMinute));
81         this.outputCollector.ack(tuple);
82 
83     }
84 
85     @Override
86     public void cleanup() {
87 
88     }
89 
90     @Override
91     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
92         outputFieldsDeclarer.declare(new Fields(WebLogConstants.SERVERTIME_KPI, WebLogConstants.KPI_COUNTS));
93     }
94 
95     @Override
96     public Map<String, Object> getComponentConfiguration() {
97         return null;
98     }
99 }

5.保存类

 1 package com.jun.it2;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.IRichBolt;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.tuple.Tuple;
 8 import org.apache.hadoop.conf.Configuration;
 9 import org.apache.hadoop.hbase.HBaseConfiguration;
10 import org.apache.hadoop.hbase.client.HTable;
11 import org.apache.hadoop.hbase.client.Put;
12 import org.apache.hadoop.hbase.util.Bytes;
13 
14 import java.io.IOException;
15 import java.util.Map;
16 
17 import static com.jun.it2.WebLogConstants.HBASE_TABLENAME;
18 
19 public class SaveBolt implements IRichBolt {
20     private HTable table;
21 
22     private OutputCollector outputCollector;
23     @Override
24     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
25         Configuration configuration = HBaseConfiguration.create();
26         try {
27             table = new HTable(configuration,HBASE_TABLENAME);
28         } catch (IOException e) {
29             e.printStackTrace();
30             throw new RuntimeException(e);
31         }
32 
33         this.outputCollector = outputCollector;
34     }
35 
36     @Override
37     public void execute(Tuple tuple) {
38         String serverTimeAndKpi = tuple.getStringByField(WebLogConstants.SERVERTIME_KPI);
39         Integer kpiCounts = tuple.getIntegerByField(WebLogConstants.KPI_COUNTS);
40         System.err.println("serverTimeAndKpi=" + serverTimeAndKpi + ", kpiCounts=" + kpiCounts);
41         if(serverTimeAndKpi!= null && kpiCounts != null){
42 
43             Put put = new Put(Bytes.toBytes(serverTimeAndKpi));
44             String columnQuelifier = serverTimeAndKpi.split("_")[0];
45             put.add(Bytes.toBytes(WebLogConstants.COLUMN_FAMILY),
46                     Bytes.toBytes(columnQuelifier),Bytes.toBytes(""+kpiCounts));
47 
48             try {
49                 table.put(put);
50             } catch (IOException e) {
51                 throw new RuntimeException(e);
52             }
53         }
54         this.outputCollector.ack(tuple);
55     }
56 
57     @Override
58     public void cleanup() {
59         if(table!= null){
60             try {
61                 table.close();
62             } catch (IOException e) {
63                 e.printStackTrace();
64             }
65         }
66     }
67 
68     @Override
69     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
70 
71     }
72 
73     @Override
74     public Map<String, Object> getComponentConfiguration() {
75         return null;
76     }
77 }

  

  

原文地址:https://www.cnblogs.com/juncaoit/p/9148100.html