【转】storm 开发系列一 第一个程序

原文: http://blog.csdn.net/csfreebird/article/details/49104777

-------------------------------------------------------------------------------------------------

本文将在本地开发环境创建一个storm程序,力求简单。

首先用mvn创建一个简单的工程hello_storm

[plain] view plain copy
 
 print?
  1. mvn archetype:generate -DgroupId=org.csfreebird -DartifactId=hello_storm -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false  


编辑pom.xml,添加dependency

[html] view plain copy
 
 print?
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  2.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">  
  3.   <modelVersion>4.0.0</modelVersion>  
  4.   <groupId>org.csfreebird</groupId>  
  5.   <artifactId>hello_storm</artifactId>  
  6.   <version>0.9.5</version>  
  7.   <packaging>jar</packaging>  
  8.   <name>hello_storm</name>  
  9.   <url>http://maven.apache.org</url>  
  10.   <dependencies>  
  11.     <dependency>  
  12.       <groupId>org.apache.storm</groupId>  
  13.       <artifactId>storm-core</artifactId>  
  14.       <version>${project.version}</version>  
  15.       <!-- keep storm out of the jar-with-dependencies -->  
  16.       <scope>provided</scope>  
  17.     </dependency>  
  18.   </dependencies>  
  19. </project>  


provided 表示storm-core的jar包只作为编译和测试时使用,在集群环境下运行时完全依赖集群环境的storm-core的jar包。

然后重命名App.Java为HelloTopology.java文件,开始编码。模仿之前的Example, 这里将所有的spout/bolt类都作为静态类定义,就放在HelloTopology.java文件。

功能如下

编写HelloTopology.java代码,spout代码来自于TestWordSpout,去掉了log的代码,改变了_引导的成员变量命名方法

[plain] view plain copy
 
 print?
  1. package org.csfreebird;  
  2.   
  3. import backtype.storm.Config;  
  4. import backtype.storm.LocalCluster;  
  5. import backtype.storm.StormSubmitter;  
  6. import backtype.storm.task.OutputCollector;  
  7. import backtype.storm.task.TopologyContext;  
  8. import backtype.storm.testing.TestWordSpout;  
  9. import backtype.storm.topology.OutputFieldsDeclarer;  
  10. import backtype.storm.topology.TopologyBuilder;  
  11. import backtype.storm.topology.base.BaseRichBolt;  
  12. import backtype.storm.topology.base.BaseRichSpout;  
  13. import backtype.storm.tuple.Fields;  
  14. import backtype.storm.tuple.Tuple;  
  15. import backtype.storm.tuple.Values;  
  16. import backtype.storm.utils.Utils;  
  17. import backtype.storm.spout.SpoutOutputCollector;  
  18. import java.util.Map;  
  19. import java.util.TreeMap;  
  20. import java.util.Random;  
  21.   
  22. public class HelloTopology {  
  23.       
  24.     public static class HelloSpout extends BaseRichSpout {  
  25.       
  26.     boolean isDistributed;  
  27.     SpoutOutputCollector collector;  
  28.   
  29.     public HelloSpout() {  
  30.         this(true);  
  31.     }  
  32.   
  33.     public HelloSpout(boolean isDistributed) {  
  34.         this.isDistributed = isDistributed;  
  35.     }  
  36.           
  37.     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {  
  38.         this.collector = collector;  
  39.     }  
  40.       
  41.     public void close() {  
  42.     }  
  43.           
  44.     public void nextTuple() {  
  45.         Utils.sleep(100);  
  46.         final String[] words = new String[] {"china", "usa", "japan", "russia", "england"};  
  47.         final Random rand = new Random();  
  48.         final String word = words[rand.nextInt(words.length)];  
  49.         this.collector.emit(new Values(word));  
  50.     }  
  51.       
  52.     public void ack(Object msgId) {  
  53.     }  
  54.   
  55.     public void fail(Object msgId) {  
  56.     }  
  57.       
  58.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  59.         declarer.declare(new Fields("word"));  
  60.     }  
  61.   
  62.     @Override  
  63.     public Map<String, Object> getComponentConfiguration() {  
  64.         if(!this.isDistributed) {  
  65.         Map<String, Object> ret = new TreeMap<String, Object>();  
  66.         ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);  
  67.         return ret;  
  68.         } else {  
  69.         return null;  
  70.         }  
  71.     }  
  72.     }  
  73.   
  74.     public static class HelloBolt extends BaseRichBolt {  
  75.     OutputCollector collector;  
  76.   
  77.     @Override  
  78.     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {  
  79.         this.collector = collector;  
  80.     }  
  81.   
  82.     @Override  
  83.     public void execute(Tuple tuple) {  
  84.         this.collector.emit(tuple, new Values("hello," + tuple.getString(0)));  
  85.         this.collector.ack(tuple);  
  86.     }  
  87.   
  88.     @Override  
  89.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  90.         declarer.declare(new Fields("word"));  
  91.     }  
  92.     }  
  93.       
  94.     public static void main(String[] args) throws Exception {  
  95.       
  96.     TopologyBuilder builder = new TopologyBuilder();  
  97.     builder.setSpout("a", new HelloSpout(), 10);  
  98.     builder.setBolt("b", new HelloBolt(), 5).shuffleGrouping("a");  
  99.   
  100.     Config conf = new Config();  
  101.     conf.setDebug(true);  
  102.   
  103.     if (args != null && args.length > 0) {  
  104.         conf.setNumWorkers(3);  
  105.         StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());  
  106.     } else {  
  107.         String test_id = "hello_test";  
  108.         LocalCluster cluster = new LocalCluster();  
  109.         cluster.submitTopology(test_id, conf, builder.createTopology());  
  110.         Utils.sleep(10000);  
  111.         cluster.killTopology(test_id);  
  112.         cluster.shutdown();  
  113.     }  
  114.     }     
  115. }  


编译成功

[plain] view plain copy
 
 print?
  1. mvn clean compile  


为了能够在本地模式运行,需要在pom.xml中添加如下:

[html] view plain copy
 
 print?
  1. <build>  
  2.   <plugins>  
  3.     <plugin>  
  4.       <groupId>org.codehaus.mojo</groupId>  
  5.       <artifactId>exec-maven-plugin</artifactId>  
  6.       <version>1.2.1</version>  
  7.       <executions>  
  8.         <execution>  
  9.           <goals>  
  10.             <goal>exec</goal>  
  11.           </goals>  
  12.         </execution>  
  13.       </executions>  
  14.       <configuration>  
  15.         <executable>java</executable>  
  16.         <includeProjectDependencies>true</includeProjectDependencies>  
  17.         <includePluginDependencies>false</includePluginDependencies>  
  18.         <classpathScope>compile</classpathScope>  
  19.         <mainClass>${storm.topology}</mainClass>  
  20.       </configuration>  
  21.     </plugin>  
  22.   </plugins>  
  23. </build>  


然后运行命令

[plain] view plain copy
 
 print?
    1. mvn compile exec:java -Dstorm.topology=org.csfreebird.HelloTopology    
原文地址:https://www.cnblogs.com/oxspirt/p/7375219.html