Storm实现数字累加Demo

  1 import java.util.Map;
  2 
  3 import backtype.storm.Config;
  4 import backtype.storm.LocalCluster;
  5 import backtype.storm.spout.SpoutOutputCollector;
  6 import backtype.storm.task.OutputCollector;
  7 import backtype.storm.task.TopologyContext;
  8 import backtype.storm.topology.OutputFieldsDeclarer;
  9 import backtype.storm.topology.TopologyBuilder;
 10 import backtype.storm.topology.base.BaseRichBolt;
 11 import backtype.storm.topology.base.BaseRichSpout;
 12 import backtype.storm.tuple.Fields;
 13 import backtype.storm.tuple.Tuple;
 14 import backtype.storm.tuple.Values;
 15 import backtype.storm.utils.Utils;
 16 
 17 /**
 18  * 数字累加求和
 19  * 先添加storm依赖
 20  * 
 21  * @author Administrator
 22  *
 23  */
 24 public class LocalTopologySum {
 25     
 26     
 27     /**
 28      * spout需要继承baserichspout,实现未实现的方法
 29      * @author Administrator
 30      *
 31      */
 32     public static class MySpout extends BaseRichSpout{
 33         private Map conf;
 34         private TopologyContext context;
 35         private SpoutOutputCollector collector;
 36         
 37         /**
 38          * 初始化方法,只会执行一次
 39          * 在这里面可以写一个初始化的代码
 40          * Map conf:其实里面保存的是topology的一些配置信息
 41          * TopologyContext context:topology的上下文,类似于servletcontext
 42          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
 43          */
 44         @Override
 45         public void open(Map conf, TopologyContext context,
 46                 SpoutOutputCollector collector) {
 47             this.conf = conf;
 48             this.context = context;
 49             this.collector = collector;
 50         }
 51 
 52         int num = 1;
 53         /**
 54          * 这个方法是spout中最重要的方法,
 55          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
 56          * 每调用一次,会向外发射一条数据
 57          */
 58         @Override
 59         public void nextTuple() {
 60             System.out.println("spout发射:"+num);
 61             //把数据封装到values中,称为一个tuple,发射出去
 62             this.collector.emit(new Values(num++));
 63             Utils.sleep(1000);
 64         }
 65         
 66         /**
 67          * 声明输出字段
 68          */
 69         @Override
 70         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 71             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
 72             //fields中定义的参数和values中传递的数值是一一对应的
 73             declarer.declare(new Fields("num"));
 74         }
 75         
 76     }
 77     
 78     
 79     /**
 80      * 自定义bolt需要实现baserichbolt
 81      * @author Administrator
 82      *
 83      */
 84     public static class MyBolt extends BaseRichBolt{
 85         private Map stormConf; 
 86         private TopologyContext context;
 87         private OutputCollector collector;
 88         
 89         /**
 90          * 和spout中的open方法意义一样
 91          */
 92         @Override
 93         public void prepare(Map stormConf, TopologyContext context,
 94                 OutputCollector collector) {
 95             this.stormConf = stormConf;
 96             this.context = context;
 97             this.collector = collector;
 98         }
 99 
100         int sum = 0;
101         /**
102          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
103          */
104         @Override
105         public void execute(Tuple input) {
106             //input.getInteger(0);//也可以根据角标获取tuple中的数据
107             Integer value = input.getIntegerByField("num");
108             sum+=value;
109             System.out.println("和:"+sum);
110         }
111         
112         /**
113          * 声明输出字段
114          */
115         @Override
116         public void declareOutputFields(OutputFieldsDeclarer declarer) {
117             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
118             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
119         }
120         
121     }
122     /**
123      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
124      * @param args
125      */
126     public static void main(String[] args) {
127         //组装topology
128         TopologyBuilder topologyBuilder = new TopologyBuilder();
129         topologyBuilder.setSpout("spout1", new MySpout());
130         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
131         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
132         
133         //创建本地storm集群
134         LocalCluster localCluster = new LocalCluster();
135         Config config = new Config();
136         localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
137     }
138 
139 }
原文地址:https://www.cnblogs.com/DreamDrive/p/5774957.html