storm wordcount练手

spout ----拆分 bolt ---合并 bolt

spout

package cn.ljh.storm.wordcount;

import java.util.Map;
import java.util.Random;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

//采集数据: Spout组件
public class WordCountSpout extends BaseRichSpout{
    //模拟产生一些数据
    private String[] data = {"I love Beijing","I love China","Beijing is the capital of China"};

    //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件
    private SpoutOutputCollector collector;

    @Override
    public void nextTuple() {
        //每隔3秒 采集一次数据
        Utils.sleep(3000);

        //由Storm的引擎调用,用于处理采集的每条数据

        //生成一个3以内的随机数
        int random = (new Random()).nextInt(3);
        String value = data[random];

        //打印
        System.out.println("采集的数据是:" + value);

        //发送给下一个组件
        this.collector.emit(new Values(value));
    }

    @Override
    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
        //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件
        //在open方法中对collector初始化
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declare) {
        // 申明发送给下一个组建的tuple的schema(结构)
        declare.declare(new Fields("sentence"));
    }
}

拆分bolt

package cn.ljh.storm.wordcount;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//第一个bolt组件:单词拆分
public class WordCountSplitBolt extends BaseRichBolt{

    //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
    private OutputCollector collector;

    @Override
    public void execute(Tuple tuple) {
        //如何处理上一级发来的数据: I love Beijing
       // String value = tuple.getStringByField("sentence");
        String value =tuple.getString(0);
        //分词
        String[] words = value.split(" ");

        //输出
        for(String w:words){
            collector.emit(new Values(w,1));
        }
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
        // 初始化
        //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declare) {
        // 申明发送给下一个组建的tuple的schema(结构)
        declare.declare(new Fields("word","count"));
    }
}

计数bolt

package cn.ljh.storm.wordcount;
import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//第二个Bolt组件:单词的计数
public class WordCountTotalBolt extends BaseRichBolt{
    //使用Map集合存储结果
    private Map<String, Integer> result = new HashMap<>();

    //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
    private OutputCollector collector;

    @Override
    public void execute(Tuple tuple) {
        //取出数据
        String word = tuple.getStringByField("word");
        int count = tuple.getIntegerByField("count");

        //求和
        if(result.containsKey(word)){
            //如果已经存在,累加
            int total = result.get(word);
            result.put(word, total+count);
        }else{
            //这是一个新单词
            result.put(word, count);
        }

        //输出到屏幕
        System.out.println("统计的结果是:" + result);

        //输出给下一个组件                                               单词           总频率
        this.collector.emit(new Values(word,result.get(word)));
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declare) {
        declare.declare(new Fields("word","total"));
    }
}

拓扑链接

package cn.ljh.storm.wordcount;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        //指定任务的spout组件
        builder.setSpout("mywordcountspout", new WordCountSpout());

        //指定任务的第一个bolt组件
        builder.setBolt("mywordcountsplit", new WordCountSplitBolt())
                .shuffleGrouping("mywordcountspout");//随机分组

        //指定任务的第二个bolt组件
        builder.setBolt("mywordcounttotal", new WordCountTotalBolt())
                .fieldsGrouping("mywordcountsplit", new Fields("word"));

        //创建任务
        StormTopology job = builder.createTopology();

        Config conf = new Config();

        //任务有两种运行方式:1、本地模式   2、集群模式
        //1、本地模式
        LocalCluster localcluster = new LocalCluster();
        localcluster.submitTopology("MyWordCount", conf, job);

        //2、集群模式:用于打包jar,并放到storm运行
//        StormSubmitter.submitTopology(args[0], conf, job);
    }


}
RUSH B
原文地址:https://www.cnblogs.com/tangsonghuai/p/11131285.html