3. Storm编程框架

实例分析lifeCycle:



 
 
RandomWordSpout
  1.  1 package cn.itcast.storm.spout;
     2 import java.util.Map;
     3 import java.util.Random;
     4 import org.apache.commons.logging.Log;
     5 import org.apache.commons.logging.LogFactory;
     6 import backtype.storm.spout.SpoutOutputCollector;
     7 import backtype.storm.task.TopologyContext;
     8 import backtype.storm.topology.OutputFieldsDeclarer;
     9 import backtype.storm.topology.base.BaseRichSpout;
    10 import backtype.storm.tuple.Fields;
    11 import backtype.storm.tuple.Values;
    12 import backtype.storm.utils.Utils;
    13 public class RandomWordSpout extends BaseRichSpout {
    14     private static final long serialVersionUID = -4287209449750623371L;
    15     
    16     private static final Log log = LogFactory.getLog(RandomWordSpout.class);
    17     private SpoutOutputCollector collector;
    18     
    19     private String[] words = new String[]{"storm", "hadoop", "hive", "flume"};
    20     
    21     private Random random = new Random();
    22     
    23     public RandomWordSpout() {
    24         log.warn("RandomWordSpout constructor method invoked");
    25     }
    26     @Override
    27     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    28         log.warn("RandomWordSpout open() method invoked");
    29         this.collector = collector;
    30     }
    31     @Override
    32     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    33         log.warn("RandomWordSpout declareOutputFields() method invoked");
    34         declarer.declare(new Fields("str"));
    35     }
    36     @Override
    37     public void nextTuple() {
    38         log.warn("RandomWordSpout nextTuple() method invoked");
    39         Utils.sleep(500);
    40         String str = words[random.nextInt(words.length)];
    41         collector.emit(new Values(str));
    42     }
    43     @Override
    44     public void activate() {
    45         log.warn("RandomWordSpout activate() method invoked");
    46     }
    47     @Override
    48     public void deactivate() {
    49         log.warn("RandomWordSpout deactivate() method invoked");
    50     }
    51 }
TransferBolt
  1.  1 package cn.itcast.storm.bolt;
     2 import java.util.Map;
     3 import org.apache.commons.logging.Log;
     4 import org.apache.commons.logging.LogFactory;
     5 import backtype.storm.task.TopologyContext;
     6 import backtype.storm.topology.BasicOutputCollector;
     7 import backtype.storm.topology.OutputFieldsDeclarer;
     8 import backtype.storm.topology.base.BaseBasicBolt;
     9 import backtype.storm.tuple.Fields;
    10 import backtype.storm.tuple.Tuple;
    11 import backtype.storm.tuple.Values;
    12 public class TransferBolt extends BaseBasicBolt {
    13     private static final long serialVersionUID = 4223708336037089125L;
    14     private static final Log log = LogFactory.getLog(TransferBolt.class);
    15     
    16     public TransferBolt() {
    17         log.warn("TransferBolt constructor method invoked");
    18     }
    19     
    20     @Override
    21     public void prepare(Map stormConf, TopologyContext context) {
    22         log.warn("TransferBolt prepare() method invoked");
    23     }
    24     @Override
    25     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    26         log.warn("TransferBolt declareOutputFields() method invoked");
    27         declarer.declare(new Fields("word"));
    28     }
    29     @Override
    30     public void execute(Tuple input, BasicOutputCollector collector) {
    31         log.warn("TransferBolt execute() method invoked");
    32         String word = input.getStringByField("str");
    33         collector.emit(new Values(word));
    34     }
    35 }

WriterBolt
  1.  1 package cn.itcast.storm.bolt;
     2 import java.io.FileWriter;
     3 import java.io.IOException;
     4 import java.util.Map;
     5 import org.apache.commons.logging.Log;
     6 import org.apache.commons.logging.LogFactory;
     7 import backtype.storm.task.TopologyContext;
     8 import backtype.storm.topology.BasicOutputCollector;
     9 import backtype.storm.topology.OutputFieldsDeclarer;
    10 import backtype.storm.topology.base.BaseBasicBolt;
    11 import backtype.storm.tuple.Tuple;
    12 public class WriterBolt extends BaseBasicBolt {
    13     private static final long serialVersionUID = -6586283337287975719L;
    14     
    15     private static final Log log = LogFactory.getLog(WriterBolt.class);
    16     
    17     private FileWriter writer = null;
    18     
    19     public WriterBolt() {
    20         log.warn("WriterBolt constructor method invoked");
    21     }
    22     @Override
    23     public void prepare(Map stormConf, TopologyContext context) {
    24         log.warn("WriterBolt prepare() method invoked");
    25         try {
    26             writer = new FileWriter("/home/" + this);
    27         } catch (IOException e) {
    28             log.error(e);
    29             throw new RuntimeException(e);
    30         }
    31     }
    32     @Override
    33     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    34         log.warn("WriterBolt declareOutputFields() method invoked");
    35     }
    36     
    37     @Override
    38     public void execute(Tuple input, BasicOutputCollector collector) {
    39         log.warn("WriterBolt execute() method invoked");
    40         String s = input.getString(0);
    41         try {
    42             writer.write(s);
    43             writer.write("
    ");
    44             writer.flush();
    45         } catch (IOException e) {
    46             log.error(e);
    47             throw new RuntimeException(e);
    48         }
    49     }
    50 }
 
TopoMain
  1. package cn.itcast.storm.topology;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import cn.itcast.storm.bolt.TransferBolt;
    import cn.itcast.storm.bolt.WriterBolt;
    import cn.itcast.storm.spout.RandomWordSpout;
    public class TopoMain {
        private static final Log log = LogFactory.getLog(TopoMain.class);
        
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("random", new RandomWordSpout(), 2);
            builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random");
            builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("transfer", new Fields("word"));
            Config conf = new Config();
            conf.setNumWorkers(2);
            conf.setDebug(true);
            log.warn("submitting topology...");
            StormSubmitter.submitTopology("life-cycle", conf, builder.createTopology());
            log.warn("topology submitted !");
        }
    }

方法执行顺序:
  • Spout方法调用顺势
  1. declareOutputFields()(调用一次)
  2. open() (调用一次)
  3. activate() (调用一次)
  4. nextTuple()    (循环调用 )    
  5. deactivate() (手动调用)
  • Bolt方法调用顺序
  1. declareOutputFields() (调用一次)
  2. prepare() (调用一次)
  3. execute()     (循环执行)
执行日志:
  1. [root@master work]# storm jar lifeCycle1.jar cn.itcast.storm.topology.TopoMain
  2. Running:/usr/local/jdk/bin/java -client -Dstorm.options=-Dstorm.home=/usr/local/apache-storm-0.9.4-Dstorm.log.dir=/usr/local/apache-storm-0.9.4/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file=-cp /usr/local/apache-storm-0.9.4/lib/clj-time-0.4.1.jar:....-Dstorm.jar=lifeCycle1.jar cn.itcast.storm.topology.TopoMain
  3. 464[main] WARN cn.itcast.storm.spout.RandomWordSpout-RandomWordSpout constructor method invoked #初始化对象,执行构造方法
  4. 490[main] WARN cn.itcast.storm.bolt.TransferBolt-TransferBolt constructor method invoked
  5. 505[main] WARN cn.itcast.storm.bolt.WriterBolt-WriterBolt constructor method invoked
  6. 515[main] WARN cn.itcast.storm.topology.TopoMain- submitting topology...
  7. 516[main] WARN cn.itcast.storm.bolt.TransferBolt-TransferBolt declareOutputFields()method invoked
  8. 906[main] WARN cn.itcast.storm.bolt.WriterBolt-WriterBolt declareOutputFields() method invoked
  9. 909[main] WARN cn.itcast.storm.spout.RandomWordSpout-RandomWordSpout declareOutputFields() method invoked
  10. 1106[main] INFO backtype.storm.StormSubmitter-Jar not uploaded to master yet.Submitting jar...
  11. 1117[main] INFO backtype.storm.StormSubmitter-Uploading topology jar lifeCycle1.jar to assigned location:/tmp/storm/nimbus/inbox/stormjar-13252904-45c2-41e8-8703-957feae2bf27.jar
  12. 1361[main] INFO backtype.storm.StormSubmitter-Successfully uploaded topology jar to assigned location:/tmp/storm/nimbus/inbox/stormjar-13252904-45c2-41e8-8703-957feae2bf27.jar
  13. 1362[main] INFO backtype.storm.StormSubmitter-Submitting topology life-cycle in distributed mode with conf {"topology.workers":2,"topology.debug":true}
  14. 1568[main] INFO backtype.storm.StormSubmitter-Finished submitting topology: life-cycle
  15. 1568[main] WARN cn.itcast.storm.topology.TopoMain- topology submitted !
 
 
 
worker日志
  1. 2015-05-16T17:57:18.295+0800 b.s.d.worker [INFO]Worker6ae03c97-dac4-4ef3-9f10-227de1219b16for storm life-cycle-4-1431770222 on 1360b011-2e64-4964
  2. -9f6c-d849db954ff2:6703 has finished loading
  3. 2015-05-16T17:57:18.797+0800 b.s.d.executor [INFO]Preparing bolt transfer:(5)
  4. 2015-05-16T17:57:18.798+0800 b.s.d.executor [INFO]Preparing bolt writer:(11)
  5. 2015-05-16T17:57:18.812+0800 c.i.s.b.WriterBolt[WARN]WriterBolt prepare() method invoked
  6. 2015-05-16T17:57:18.813+0800 b.s.d.executor [INFO]Prepared bolt writer:(11)
  7. 2015-05-16T17:57:18.820+0800 c.i.s.b.TransferBolt[WARN]TransferBolt prepare() method invoked
  8. 2015-05-16T17:57:18.821+0800 b.s.d.executor [INFO]Prepared bolt transfer:(5)
  9. 2015-05-16T17:57:18.834+0800 b.s.d.executor [INFO]Preparing bolt __system:(-1)
  10. 2015-05-16T17:57:18.834+0800 b.s.d.executor [INFO]Preparing bolt transfer:(7)
  11. 2015-05-16T17:57:18.839+0800 c.i.s.b.TransferBolt[WARN]TransferBolt prepare() method invoked
  12. 2015-05-16T17:57:18.839+0800 b.s.d.executor [INFO]Prepared bolt transfer:(7)
  13. 2015-05-16T17:57:18.840+0800 b.s.d.executor [INFO]Preparing bolt __acker:(1)
  14. 2015-05-16T17:57:18.841+0800 b.s.d.executor [INFO]Opening spout random:(3)
  15. 2015-05-16T17:57:18.841+0800 b.s.d.executor [INFO]Preparing bolt writer:(9)
  16. 2015-05-16T17:57:18.842+0800 c.i.s.b.WriterBolt[WARN]WriterBolt prepare() method invoked
  17. 2015-05-16T17:57:18.842+0800 b.s.d.executor [INFO]Prepared bolt writer:(9)
  18. 2015-05-16T17:57:18.846+0800 b.s.d.executor [INFO]Prepared bolt __acker:(1)
  19. 2015-05-16T17:57:18.848+0800 c.i.s.s.RandomWordSpout[WARN]RandomWordSpout open() method invoked
  20. 2015-05-16T17:57:18.854+0800 b.s.d.executor [INFO]Opened spout random:(3)
  21. 2015-05-16T17:57:18.867+0800 b.s.d.executor [INFO]Prepared bolt __system:(-1)
  22. 2015-05-16T17:57:18.873+0800 b.s.d.executor [INFO]Activating spout random:(3)
  23. 2015-05-16T17:57:18.873+0800 c.i.s.s.RandomWordSpout[WARN]RandomWordSpout activate() method invoked
  24. 2015-05-16T17:57:18.873+0800 c.i.s.s.RandomWordSpout[WARN]RandomWordSpout nextTuple() method invoked
  25. 2015-05-16T17:57:19.159+0800 b.s.d.executor [INFO]Processing received message source: random:4, stream: default, id:{},[hadoop]
  26. 2015-05-16T17:57:19.160+0800 c.i.s.b.TransferBolt[WARN]TransferBolt execute() method invoked
  27. 2015-05-16T17:57:19.161+0800 b.s.d.task [INFO]Emitting: transfer default [hadoop]
  28. 2015-05-16T17:57:19.162+0800 b.s.d.executor [INFO]Processing received message source: transfer:7, stream: default, id:{},[hadoop]
  29. 2015-05-16T17:57:19.162+0800 c.i.s.b.WriterBolt[WARN]WriterBolt execute() method invoked



将来的你,一定会感谢现在拼命努力的你。
原文地址:https://www.cnblogs.com/51runsky/p/4572823.html