[BD] Storm

什么是实时计算

  • 离线计算:批处理,代表MapReduce、Spark Core,采集数据Sqoop、Flume
  • 实时计算:源源不断,代表Storm等,采集数据Flume
  • 框架
    • Apache Storm
    • Spark Streaming:把流式数据转换成离散数据,本质是离线计算
    • JStrom:阿里基于Strom开发
    • Flink

 环境搭建

  • 伪分布
    • storm nimbus &
    • storm supervisor &
    • storm ui &

  • 全分布

编程案例 WordCount

  • 启用Debug,日志查看器,在网页上查看数据
    • "topology.eventlogger.executors": 1
    • /root/training/apache-storm-1.0.3/examples/storm-starter 
    • storm jar storm-starter-topologies-1.0.3.jar org.apache.storm.starter.WordCountTopology MyWC
    • storm logviewer &

WordCountTopology.java

  1 package demo.wc;
  2 
  3 import org.apache.storm.Config;
  4 import org.apache.storm.LocalCluster;
  5 import org.apache.storm.StormSubmitter;
  6 import org.apache.storm.generated.StormTopology;
  7 import org.apache.storm.hdfs.bolt.HdfsBolt;
  8 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
  9 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
 10 import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
 11 import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
 12 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
 13 import org.apache.storm.redis.bolt.RedisStoreBolt;
 14 import org.apache.storm.redis.common.config.JedisPoolConfig;
 15 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 16 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 17 import org.apache.storm.topology.IRichBolt;
 18 import org.apache.storm.topology.TopologyBuilder;
 19 import org.apache.storm.tuple.Fields;
 20 import org.apache.storm.tuple.ITuple;
 21 
 22 
 23 //任务的主程序,创建任务:Topology
 24 public class WordCountTopology {
 25 
 26     public static void main(String[] args) throws Exception {
 27         TopologyBuilder builder = new TopologyBuilder();
 28         
 29         //设置任务的spout组件
 30         builder.setSpout("wordcount_spout", new WordCountSpout());
 31         
 32         //设置任务的单词拆分的bolt组件,是随机分组
 33         builder.setBolt("wordcount_split", new WordCountSplitBolt()).shuffleGrouping("wordcount_spout");
 34         
 35         //设置任务的单词计数的bolt组件,是按字段分组
 36         builder.setBolt("wordcount_total", new WordCountTotalBolt()).fieldsGrouping("wordcount_split", new Fields("word"));
 37         
 38         //设置任务的第三个Bolt组件,将结果保存到Redis,直接使用Storm提供的BOlt
 39         //builder.setBolt("wordcount_redis", createRedisBolt()).shuffleGrouping("wordcount_total");
 40         
 41         //设置任务的第三个Bolt组件,将结果保存到HDFS(文件),直接使用Storm提供的Bolt
 42         builder.setBolt("wordcount_hdfs", createHDFSBolt()).shuffleGrouping("wordcount_total");
 43         
 44         //设置任务的第三个Bolt组件,将结果保存到HBase中
 45         //builder.setBolt("wordcount_hbase", new WordCountHBaseBolt()).shuffleGrouping("wordcount_total");
 46         
 47         
 48         //创建一个任务:Topology
 49         StormTopology topology = builder.createTopology();
 50         
 51         //创建一个Config对象,保存配置信息
 52         Config conf = new Config();
 53         
 54         /*
 55          * 提交Storm的任务有两种方式
 56          * 1、本地模式
 57          * 2、集群模式
 58          */
 59         LocalCluster cluster = new LocalCluster();
 60         cluster.submitTopology("MyWordCount", conf, topology);
 61         
 62 //        StormSubmitter.submitTopology("MyWordCount", conf, topology);
 63 
 64     }
 65 
 66     private static IRichBolt createHDFSBolt() {
 67         // 将结果保存到HDFS 文件
 68         
 69         HdfsBolt bolt = new HdfsBolt();
 70         //设置HDFS的相关配置信息
 71         //HDFS的位置:NameNode的地址
 72         bolt.withFsUrl("hdfs://192.168.174.111:9000");
 73         
 74         //设置保存的HDFS的目录
 75         bolt.withFileNameFormat(new DefaultFileNameFormat().withPath("/stormdata"));
 76         
 77         //保存的是<key value>,设置数据保存到文件的时候,分隔符 |
 78         //举例:<Beijing,10>   ----> 结果: Beijing|10
 79         bolt.withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter("|"));
 80         
 81         //流式处理,多大的数据生成一个文件?
 82         //每5M的数据生成一个文件
 83         bolt.withRotationPolicy(new FileSizeRotationPolicy(5.0f, Units.MB));
 84         
 85         //当输出tuple达到了一定大小,就会跟HDFS进行一次同步
 86         bolt.withSyncPolicy(new CountSyncPolicy(1000));
 87         
 88         
 89         return bolt;
 90     }
 91 
 92     private static IRichBolt createRedisBolt() {
 93         //把单词计数是结果保存到Redis
 94         
 95         //创建Redis的连接池
 96         JedisPoolConfig.Builder builder = new JedisPoolConfig.Builder();
 97         builder.setHost("192.168.174.111");
 98         builder.setPort(6379);
 99         JedisPoolConfig poolConfig = builder.build();
100         
101         //参数:StoreMapper:用于指定存入Redis中的数据格式
102         return new RedisStoreBolt(poolConfig, new RedisStoreMapper() {
103             
104             @Override
105             public RedisDataTypeDescription getDataTypeDescription() {
106                 //定义Redis中的数据类型:WordCount采用什么数据类型?
107                 //使用Hash集合
108                 return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH,
109                                                     "wordcount");
110             }
111             
112             @Override
113             public String getValueFromTuple(ITuple tuple) {
114                 // 从上一个Tuple中取出值:频率
115                 return String.valueOf(tuple.getIntegerByField("total"));
116             }
117             
118             @Override
119             public String getKeyFromTuple(ITuple tuple) {
120                 // 从上一个Tuple中取出key:单词
121                 return tuple.getStringByField("word");
122             }
123         });
124     }
125 }
View Code

WordCountSpout.java

 1 package demo.wc;
 2 
 3 import java.util.Map;
 4 import java.util.Random;
 5 
 6 import org.apache.storm.spout.SpoutOutputCollector;
 7 import org.apache.storm.task.TopologyContext;
 8 import org.apache.storm.topology.OutputFieldsDeclarer;
 9 import org.apache.storm.topology.base.BaseRichSpout;
10 import org.apache.storm.tuple.Fields;
11 import org.apache.storm.tuple.Values;
12 import org.apache.storm.utils.Utils;
13 
14 //第一级组件,作为任务的Spout组件,来采集数据
15 //模拟一些数据,随机产生数据
16 public class WordCountSpout extends BaseRichSpout {
17 
18     //定义我们要产生的数据
19     private String[] datas = {"I love Beijing","I love China","Beijing is the capital of China"};
20 
21     //定义一个变量来保存输出流
22     private SpoutOutputCollector collector;
23     
24     @Override
25     public void nextTuple() {
26         //每隔2秒采集一次
27         Utils.sleep(2000);
28         
29         // 由Storm的框架调用,用于如何接受数据
30         //产生一个3以内的随机数
31         int random = (new Random()).nextInt(3);
32         //数据
33         String data = datas[random];
34         
35         //把数据发送给下一个组件
36         //数据一定要遵循schema的结构
37         System.out.println("采集的数据是:" + data);
38         this.collector.emit(new Values(data));
39     }
40 
41     @Override
42     public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
43         //相当于Spout初始化方法
44         //参数:SpoutOutputCollector collector 相当于是输出流
45         this.collector = collector;
46     }
47 
48     @Override
49     public void declareOutputFields(OutputFieldsDeclarer declare) {
50         // 申明Tuple的格式,是Schema
51         declare.declare(new Fields("sentence"));
52     }
53 }
View Code

WordCountSplitBolt.java

 1 package demo.wc;
 2 
 3 import java.util.Map;
 4 
 5 import org.apache.storm.task.OutputCollector;
 6 import org.apache.storm.task.TopologyContext;
 7 import org.apache.storm.topology.OutputFieldsDeclarer;
 8 import org.apache.storm.topology.base.BaseRichBolt;
 9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Tuple;
11 import org.apache.storm.tuple.Values;
12 
13 //第二级组件,是bolt组件,用于单词的拆分
14 public class WordCountSplitBolt extends BaseRichBolt{
15 
16     private OutputCollector collector;
17     
18     @Override
19     public void execute(Tuple tuple) {
20         //如何处理上一级组件发来的数据: I love Beijing
21         String data = tuple.getStringByField("sentence");
22         
23         //分词
24         String[] words = data.split(" ");
25         
26         //输出
27         for(String w:words){
28             collector.emit(new Values(w,1));
29         }
30     }
31 
32     @Override
33     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
34         // 对Bolt进行初始化
35         this.collector = collector;
36     }
37 
38     @Override
39     public void declareOutputFields(OutputFieldsDeclarer declare) {
40         //申明Tuple的格式
41         declare.declare(new Fields("word","count"));
42         
43     }
44 }
View Code

WordCountTotalBolt.java

 1 package demo.wc;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 
 6 import org.apache.storm.task.OutputCollector;
 7 import org.apache.storm.task.TopologyContext;
 8 import org.apache.storm.topology.OutputFieldsDeclarer;
 9 import org.apache.storm.topology.base.BaseRichBolt;
10 import org.apache.storm.tuple.Fields;
11 import org.apache.storm.tuple.Tuple;
12 import org.apache.storm.tuple.Values;
13 
14 //第三级组件,是bolt组件,用于单词的计数
15 public class WordCountTotalBolt extends BaseRichBolt {
16 
17     private OutputCollector collector;
18     
19     //定义一个Map集合来保存结果
20     private Map<String, Integer> result = new HashMap<>();
21     
22     @Override
23     public void execute(Tuple tuple) {
24         // 对每个单词进行计数
25         //取出数据
26         String word = tuple.getStringByField("word");
27         int count = tuple.getIntegerByField("count");
28         
29         if(result.containsKey(word)){
30             //如果包含,进行累加
31             int total = result.get(word);
32             result.put(word, total+count);
33         }else{
34             //这个单词第一次出现
35             result.put(word, count);
36         }
37         
38         //打印在屏幕上
39         System.out.println("统计的结果是: " + result);
40         
41         //把结果继续发送给下一个bolt组件:  (单词,频率)
42         this.collector.emit(new Values(word,result.get(word)));
43     }
44 
45     @Override
46     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
47         // TODO Auto-generated method stub
48         this.collector = collector;
49     }
50 
51     @Override
52     public void declareOutputFields(OutputFieldsDeclarer declare) {
53         // TODO Auto-generated method stub
54         declare.declare(new Fields("word","total"));
55     }
56 }
View Code

编程模型

  • Topology:Storm中运行的一个实时应用程序
  • Stream:数据流向
  • Spout:在一个Topology中获取源数据流的组件
  • Bolt:接收数据然后执行处理的组件,可级联
  • Tuple:一次消息传递的基本单元
  • StreamGroup:数据分组策略
    • 随机分组:1-2之间
    • 按字段分组:2-3之间
    • 广播分组

流式计算架构

  • Flume:获取数据
  • Kafka:临时保存数据
  • Storm:计算数据
  • Redis:保存数据

 原理分析

  • Storm在ZK中保存的数据

  • Storm提交任务的过程

 

  • Storm内部通信的机制

外部集成

  • Redis
    • 添加依赖jar包,在WordCountTopology中编写Bolt组件
    • 创建连接池

  • JDBC
  • HDFS:storm-hdfs***.jar
  • HBase:自己开发一个Bolt组件
  • Kafka
  • Hive

参考

大数据实时计算框架

https://www.csdn.net/gather_21/MtTacgxsMDI1Mi1ibG9n.html

原文地址:https://www.cnblogs.com/cxc1357/p/12713182.html