storm入门——本地模式helloworld

创建maven项目,在pom.xml中加入以下配置:

    <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <type>jar</type>
            <version>0.9.3-rc1</version>
        </dependency>

创建SimpleSpout类用于获取数据流:

 1 package com.hirain.storm.helloworld;
 2 
 3 import java.util.Map;
 4 import java.util.Random;
 5 
 6 import backtype.storm.spout.SpoutOutputCollector;
 7 import backtype.storm.task.TopologyContext;
 8 import backtype.storm.topology.OutputFieldsDeclarer;
 9 import backtype.storm.topology.base.BaseRichSpout;
10 import backtype.storm.tuple.Fields;
11 import backtype.storm.tuple.Values;
12 
13 public class SimpleSpout extends BaseRichSpout{
14 
15     /**
16      * 
17      */
18     private static final long serialVersionUID = 1L;
19     
20     //用来发射数据的工具类
21     private SpoutOutputCollector collector;
22     
23     private static String[] info = new String[]{
24         "comaple	,12424,44w46,654,12424,44w46,654,",
25         "lisi	,435435,6537,12424,44w46,654,",
26         "lipeng	,45735,6757,12424,44w46,654,",
27         "hujintao	,45735,6757,12424,44w46,654,",
28         "jiangmin	,23545,6457,2455,7576,qr44453",
29         "beijing	,435435,6537,12424,44w46,654,",
30         "xiaoming	,46654,8579,w3675,85877,077998,",
31         "xiaozhang	,9789,788,97978,656,345235,09889,",
32         "ceo	,46654,8579,w3675,85877,077998,",
33         "cto	,46654,8579,w3675,85877,077998,",
34         "zhansan	,46654,8579,w3675,85877,077998,"};
35     
36     Random random=new Random();
37     
38     
39     /**
40      * 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用
41      */
42     public void nextTuple() {
43         try {
44              String msg = info[random.nextInt(11)];
45             // 调用发射方法
46             collector.emit(new Values(msg));
47             // 模拟等待100ms
48             Thread.sleep(100);
49         } catch (InterruptedException e) {
50             e.printStackTrace();
51         }
52     }
53     /**
54      * 初始化collector
55      */
56     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
57         this.collector = collector;
58         
59     }
60     
61     
62     /**
63      * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。
64      * 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构
65      */
66     public void declareOutputFields(OutputFieldsDeclarer declarer) {
67         declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应
68     }
69 
70 }

创建SimpleBolt类,用于处理数据:

 1 package com.hirain.storm.helloworld;
 2 
 3 import backtype.storm.topology.BasicOutputCollector;
 4 import backtype.storm.topology.OutputFieldsDeclarer;
 5 import backtype.storm.topology.base.BaseBasicBolt;
 6 import backtype.storm.tuple.Fields;
 7 import backtype.storm.tuple.Tuple;
 8 import backtype.storm.tuple.Values;
 9 
10 
11 
12 public class SimpleBolt extends BaseBasicBolt {
13 
14     /**
15      * 
16      */
17     private static final long serialVersionUID = 1L;
18 
19     public void execute(Tuple input,BasicOutputCollector collector) {
20         try {
21             String msg = input.getString(0);
22             if (msg != null){
23                 //System.out.println("msg="+msg);
24                 collector.emit(new Values(msg + "msg is processed!"));
25             }
26                 
27         } catch (Exception e) {
28             e.printStackTrace(); 
29         }
30 
31     }
32 
33     public void declareOutputFields(
34             OutputFieldsDeclarer declarer) {
35         declarer.declare(new Fields("info"));
36 
37     }
38 
39 }

创建main方法配置storm的topology并启动本地模式运行:

 1 package com.hirain.storm.helloworld;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.topology.TopologyBuilder;
 7 
 8 public class SimpleTopology {
 9     
10     
11     public static void main(String[] args) {
12         try {
13             // 实例化TopologyBuilder类。
14             TopologyBuilder topologyBuilder = new TopologyBuilder();
15             // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
16             topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1);
17             // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。
18             topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout");
19             Config config = new Config();
20             config.setDebug(true);
21             if (args != null && args.length > 0) {
22                 config.setNumWorkers(1);
23                 StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
24             } else {
25                 // 这里是本地模式下运行的启动代码。
26                 config.setMaxTaskParallelism(1);
27                 LocalCluster cluster = new LocalCluster();
28                 cluster.submitTopology("simple", config, topologyBuilder.createTopology());
29             }
30             
31         } catch (Exception e) {
32             e.printStackTrace(); 
33         }
34     }
35 }

以上为storm的简单的helloworld,仅供参考

转载请注明出处,期待共同进步...
原文地址:https://www.cnblogs.com/zhangyukun/p/4031066.html