大数据处理框架之Strom: Storm----helloword

大数据处理框架之Strom: Storm----helloword

Storm按照设计好的拓扑流程运转,所以写代码之前要先设计好拓扑图。
这里写一个简单的拓扑:


第一步:创建一个拓扑类
含有main方法的类型,作为程序入口:

package bhz.topology;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import bhz.bolt.PrintBolt;
import bhz.bolt.WriteBolt;
import bhz.spout.PWSpout;



public class PWTopology1 {

    public static void main(String[] args) throws Exception {
        
        //拓扑配置类Config
        Config cfg = new Config();
        //启动两个Worker进程
        cfg.setNumWorkers(2);
        cfg.setDebug(true);
        
        //TopologyBuilder 组装拓扑的类
        TopologyBuilder builder = new TopologyBuilder();
        //设置数据源
        builder.setSpout("spout", new PWSpout());
        //设置第一个处理bolt,并指定该bolt属于spout分组
        builder.setBolt("print-bolt", new PrintBolt()).shuffleGrouping("spout");
        //设置第二个bolt,并指定该bolt属于print-bolt分组
        builder.setBolt("write-bolt", new WriteBolt()).shuffleGrouping("print-bolt");
        //创建拓扑
        StormTopology  top1 = builder.createTopology();
        
        //1 本地集群
        LocalCluster cluster = new LocalCluster();
        //拓扑的名字-top1
        cluster.submitTopology("top1", cfg, top1);
        Thread.sleep(10000);
        //10s后关闭该拓扑
        cluster.killTopology("top1");
        //关闭本地集群
        cluster.shutdown();
        
        //2 集群模式提交拓扑 (与本地模式不能共用)
        //StormSubmitter.submitTopology("top1", cfg, top1);
        
    }
}

第二步:创建数据源类
方式一:继承backtype.storm.topology.base.BaseRichSpout类
方式二:实现backtype.storm.topology.IRichSpout接口

package bhz.spout;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class PWSpout extends BaseRichSpout{

    private static final long serialVersionUID = 1L;
    private SpoutOutputCollector collector;
    
    private static final Map<Integer, String> map = new HashMap<Integer, String>();
    
    static {
        map.put(0, "java");
        map.put(1, "php");
        map.put(2, "groovy");
        map.put(3, "python");
        map.put(4, "ruby");
    }
    
    /**
     * 重写初始化方法  open
     */
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        //对spout进行初始化
        this.collector = collector;
        //System.out.println(this.collector);
    }
    
    /**
     * 轮询tuple
     * 一直不间断的从数据源取出数据
     */
    @Override
    public void nextTuple() {
        //随机发送一个单词
        final Random r = new Random();
        int num = r.nextInt(5);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //emit 发送数据
        this.collector.emit(new Values(map.get(num)));
    }

    /**
     * declarer声明发送数据的field
     * 下一个bolt会根据声明的field取值
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //进行声明
        declarer.declare(new Fields("print"));
    }

}

第三步:创建数据处理类

方式一:继承backtype.storm.topology.base.BaseBasicBolt类

方式二:实现backtype.storm.topology.IRichBolt接口

package bhz.bolt;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class PrintBolt extends BaseBasicBolt {

    private static final Log log = LogFactory.getLog(PrintBolt.class);
    
    private static final long serialVersionUID = 1L;
    
    /**
     * bolt处理类执行方法  
     * 在这里可以写具体业务逻辑,对数据进行怎样的处理...
     * 如果后面还有bolt 需要再使用emit发送数据了
     */
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //获取上一个组件所声明的Field
        String print = input.getStringByField("print");
        log.info("【print】: " + print);
        //进行传递给下一个bolt
        collector.emit(new Values(print));
        
    }
    
    /**
     * declarer声明发送数据的field
     * 下一个bolt会根据声明的field取值
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("write"));
    }

}
package bhz.bolt;

import java.io.FileWriter;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import clojure.main;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WriteBolt extends BaseBasicBolt {

    private static final long serialVersionUID = 1L;

    private static final Log log = LogFactory.getLog(WriteBolt.class);
    
    private FileWriter writer ;
    
    /**
     * bolt处理类执行方法  
     * 在这里可以写具体业务逻辑,对数据进行怎样的处理...
     * 如果后面没有bolt 就不需要再使用emit发送数据了
     */
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //获取上一个组件所声明的Field
        String text = input.getStringByField("write");
        try {
            if(writer == null){
                if(System.getProperty("os.name").equals("Windows 10")){
                    writer = new FileWriter("D:\stormtest\" + this);
                } else if(System.getProperty("os.name").equals("Windows 8.1")){
                    writer = new FileWriter("D:\stormtest\" + this);
                } else if(System.getProperty("os.name").equals("Windows 7")){
                    writer = new FileWriter("D:\stormtest\" + this);
                } else if(System.getProperty("os.name").equals("Linux")){
                    System.out.println("----:" + System.getProperty("os.name"));
                    writer = new FileWriter("/usr/local/temp/" + this);
                }
            }
            log.info("【write】: 写入文件");
            writer.write(text);
            writer.write("
");
            writer.flush();
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 如果后面没有bolt 就不需要再声明filed了 这里写一个空方法
     * 
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        
    }
    


}

第四步:写完代码,运行

两种运行方式:
本地运行:

//1 本地集群
LocalCluster cluster = new LocalCluster();
//拓扑的名字-top1
cluster.submitTopology("top1", cfg, top1);
Thread.sleep(10000);
//10s后关闭该拓扑
cluster.killTopology("top1");
//关闭本地集群
cluster.shutdown();

集群运行:

//2 集群模式提交拓扑 (与本地模式不能共用)
StormSubmitter.submitTopology("top1", cfg, top1);

来看一下 集群运行模式:

将以上代码打包成storm01.jar 上传到集群 Nimbus主节点上

运行命令:storm jar jar包名 拓扑类全路径名

[cluster@PCS101 tempjar]$ storm jar storm01.jar bhz.topology.PWTopology1
会打印一些东西 看最后两行:
291  [main] INFO  backtype.storm.StormSubmitter - Submitting topology top1 in distributed mode with conf {"topology.workers":2,"topology.debug":true}
438  [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: top1

查看任务命令:storm list

[cluster@PCS101 tempjar]$ storm list
Topology_name        Status     Num_tasks  Num_workers  Uptime_secs
-------------------------------------------------------------------
top1                 ACTIVE     5          2            32 

supervisor节点,使用jps查看  多了worker进程

[cluster@PCS103 ~]$ jps
22806 worker
12873 QuorumPeerMain
24271 Jps
18488 supervisor

UI界面查看:

查看worker日志:

[cluster@PCS101 apache-storm-0.9.2]$ cd logs
[cluster@PCS101 logs]$ ls
access.log  logviewer.log  metrics.log  nimbus.log  supervisor.log  ui.log  worker-6703.log
[cluster@PCS101 logs]$ tail -f worker-6703.log
2018-10-26 11:12:07 b.s.d.task [INFO] Emitting: spout default [groovy]
2018-10-26 11:12:07 b.s.d.task [INFO] Emitting: spout default [php]
2018-10-26 11:12:08 b.s.d.task [INFO] Emitting: spout default [groovy]
2018-10-26 11:12:08 b.s.d.task [INFO] Emitting: spout default [groovy]
2018-10-26 11:12:09 b.s.d.task [INFO] Emitting: spout default [java]
2018-10-26 11:12:09 b.s.d.task [INFO] Emitting: spout default [groovy]

关闭拓扑任务:

使用命令:storm kill top1

[cluster@PCS101 logs]$ storm kill top1
574  [main] INFO  backtype.storm.command.kill-topology - Killed topology: top1

UI界面:点击kill

另外案例:wordCount

原文地址:https://www.cnblogs.com/cac2020/p/9855150.html