Storm 运行例子

1.建立Java工程

使用idea,添加lib库,拷贝storm中lib到工程中

2.拷贝wordcount代码

下载src包,解压找到

apache-storm-0.9.4-srcapache-storm-0.9.4examplesstorm-startersrcjvmstormstarter目录下

拷贝WordCountTopology.java内容;

修改python处理方式;

 1 import backtype.storm.Config;
 2 import backtype.storm.LocalCluster;
 3 import backtype.storm.StormSubmitter;
 4 import backtype.storm.task.ShellBolt;
 5 import backtype.storm.topology.BasicOutputCollector;
 6 import backtype.storm.topology.IRichBolt;
 7 import backtype.storm.topology.OutputFieldsDeclarer;
 8 import backtype.storm.topology.TopologyBuilder;
 9 import backtype.storm.topology.base.BaseBasicBolt;
10 import backtype.storm.tuple.Fields;
11 import backtype.storm.tuple.Tuple;
12 import backtype.storm.tuple.Values;
13 import com.bigdata.storm.spout.*;
14 
15 import java.util.HashMap;
16 import java.util.Map;
17 /**
18  * Created by Edward on 2016/8/17.
19  */
20 public class MyTest {
21 
22     public static class SplitSentence extends BaseBasicBolt{
23 
24         @Override
25         public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
26             String word = tuple.getString(0);
27             String str[] = word.split(" ");
28 
29             System.out.println("Split Sentence:" + tuple.getSourceStreamId());
30             for(int i=0; i<str.length; i++)
31             {
32                 basicOutputCollector.emit(new Values(str[i]));
33             }
34         }
35 
36         @Override
37         public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
38             outputFieldsDeclarer.declare(new Fields("word"));
39         }
40     }
41 
42     public static class WordCount extends BaseBasicBolt {
43         Map<String, Integer> counts = new HashMap<String, Integer>();
44 
45         @Override
46         public void execute(Tuple tuple, BasicOutputCollector collector) {
47             String word = tuple.getString(0);
48             Integer count = counts.get(word);
49             if (count == null)
50                 count = 0;
51             count++;
52             counts.put(word, count);
53             System.out.println("Word Count:" + tuple.getSourceStreamId());
54             collector.emit(new Values(word, count));
55 
56         }
57 
58         @Override
59         public void declareOutputFields(OutputFieldsDeclarer declarer) {
60             declarer.declare(new Fields("word", "count"));
61         }
62     }
63 
64     public static void main(String[] args) throws Exception {
65 
66         TopologyBuilder builder = new TopologyBuilder();
67 
68         builder.setSpout("spout", new RandomSentenceSpout(), 5);
69 
70         builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
71         builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
72 
73         Config conf = new Config();
74         conf.setDebug(true);
75 
76 
77         if (args != null && args.length > 0) {
78             conf.setNumWorkers(3);
79 
80             StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
81         }
82         else {
83             conf.setMaxTaskParallelism(3);
84 
85             LocalCluster cluster = new LocalCluster();
86             cluster.submitTopology("word-count", conf, builder.createTopology());
87 
88             Thread.sleep(50000);
89 
90             cluster.shutdown();
91         }
92     }
93 }

3.拷贝随机生成spout代码

找到 apache-storm-0.9.4-srcapache-storm-0.9.4examplesstorm-startersrcjvmstormstarterspout

拷贝RandomSentenceSpout.java到工程中

 1 import backtype.storm.spout.SpoutOutputCollector;
 2 import backtype.storm.task.TopologyContext;
 3 import backtype.storm.topology.OutputFieldsDeclarer;
 4 import backtype.storm.topology.base.BaseRichSpout;
 5 import backtype.storm.tuple.Fields;
 6 import backtype.storm.tuple.Values;
 7 import backtype.storm.utils.Utils;
 8 
 9 import java.util.Map;
10 import java.util.Random;
11 
12 public class RandomSentenceSpout extends BaseRichSpout {
13     SpoutOutputCollector _collector;
14     Random _rand;
15 
16     @Override
17     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
18         _collector = collector;
19         _rand = new Random();
20     }
21 
22     @Override
23     public void nextTuple() {
24         Utils.sleep(10000);
25         String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
26                 "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
27         String sentence = sentences[_rand.nextInt(sentences.length)];
28         Object id = new Object();
29         System.out.println(id);
30         //id message ID 用来保证可靠性的,如果失败fail 会返回 message id 信息
31         _collector.emit(new Values(sentence), id);
32     }
33 
34     @Override
35     public void ack(Object id) {
36         System.out.println("storm spout ack id = "+id);
37     }
38 
39     @Override
40     public void fail(Object id) {
41     }
42 
43     @Override
44     public void declareOutputFields(OutputFieldsDeclarer declarer) {
45         declarer.declare(new Fields("word"));
46     }
47 
48 }

4.本地运行

在idea中直接点击运行,观察运行过程

5.集群运行

将程序打包成jar,然后放到集群中运行。

把jar包放到storm/test目录下,执行:

bin/storm jar test/storm-sample.jar com.bigdata.storm.MyTest WordCount
原文地址:https://www.cnblogs.com/one--way/p/5781022.html