使用storm分别进行计数和词频统计

计数

直接上代码

public class LocalStormSumTopology {

    public static void main(String[] agrs) {

        //Topology是通过build模式创建出来的
        //storm中的所有作业都是通过topology来指定的
        TopologyBuilder builder = new TopologyBuilder();

        //在设置bolt到topology时,需要设置该bolt的上游的spout或者bolt的id,这样topology才知道该bolt的执行顺序,有点类似于单向链表结构,
        //每一个环节持有上一个环节的引用,在bolt这里是持有上一个环节的id,这样同样可以定位到上一个环节
        builder.setSpout("DataSourceSpout", new DataSourceSpout());
        builder.setBolt("TotalBolt", new TotalBolt()).shuffleGrouping("DataSourceSpout");


        //启动一个本地的Storm集群,不需要搭真正的集群,本地集群使用LocalCluster来提交topology,如果是在生产环境上提交topology,那么使用
        //这个类StormSubmitter来代替LocalCluster来提交topology
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalStormSumTopology", new Config(), builder.createTopology());
    }


    private static final String NUM = "num";

    /**
     * 发送数据源的spout类,一般是继承BaseRichSpout这个类
     */
    public static class DataSourceSpout extends BaseRichSpout {

        private SpoutOutputCollector mCollector;

        int num;

        /**
         * 在storm开始的开始工作前回调一次,在这里做初始化
         *
         * @param conf      配置参数
         * @param context   上下文
         * @param collector 数据发射器,用来将数据发送到bolt中,类似于rxjava的数据发射器
         */
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.mCollector = collector;
        }


        /**
         * 这是一个死循环方法,会自动循环调用,这个方法用来发送数据到下游
         */
        public void nextTuple() {

            //将数据发射到bolt中,一般使用Values这个类,传入的是可变参数,底层封装成ArrayList
            mCollector.emit(new Values(++num));

            System.out.println("从spout发射出的数据:" + num);

            Utils.sleep(1000);
        }

        /**
         * 声明从spout中发射的数据的字段名,在bolt阶段可以通过这里预设置的字段名进行取值,类似于安卓中的使用sp传输,
         * 字段名和发送出来的数据一一对应,这样如果下游需要接收多个数据发射源,那么可以通过该字段名来做区别
         *
         * @param declarer
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            
            //一般使用Fields来进行封装字段名fields底层封装了ArrayList<String>
            declarer.declare(new Fields(NUM));
        }

        @Override
        public void close() {
            this.mCollector = null;
        }
    }

    public static class TotalBolt extends BaseRichBolt {

        private int sum = 0;

        /**
         * 初始化方法,跟spout中的open方法类似,只会调用一次,在这里做初始化
         *
         * @param stormConf
         * @param context
         * @param collector
         */
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        /**
         * 每从上游接收到一个数据,就调用该方法回调过来
         *
         * @param input 用来提取上一个流程传过来的数据
         */
        public void execute(Tuple input) {
            
            //通过在上游设置的字段名来获取数据
            Integer integerByField = input.getIntegerByField(NUM);
            sum += integerByField;
            System.out.println("累加的结果是:" + sum);
        }

        /**
         * 为往下游发送的数据加上字段名,方面区别数据的来源
         * @param declarer
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }

    }
}

词频统计

直接上代码

public class LocalWorldCountStormTopology {


    public static void main(String[] agrs) {

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout", new DataSourceSpout());
        builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("DataSourceSpout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalWorldCountStormTopology", new Config(), builder.createTopology());
    }

    /**
     * 输出每一行文本的spout
     */

    public static class DataSourceSpout extends BaseRichSpout {

        private SpoutOutputCollector mCollector;

        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.mCollector = collector;
        }

        public void nextTuple() {

            //通过这个方法,可以获取到某一个文件夹下所有符合规定后缀的文件,并且可以设置是否递归获取
            Collection<File> files = FileUtils.listFiles(new File("/Users/teng/Downloads"), new String[]{"txt"}, true);

            try {
                for (File file : files) {
                    
                    //因为下一步还需要做切割,因此需要先将文件一行一行取出来,放在String集合中
                    List<String> lines = FileUtils.readLines(file);
                    for (String line : lines) {
                        //使用,进行分割
                        String[] split = line.split(",");
                        //发射单词出去
                        for (String s : split) {
                            mCollector.emit(new Values(s));
                        }
                    }
                //执行完成一次之后,需要修改文件名,这样就不用一直执行
                FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //定义数据的字段名
            declarer.declare(new Fields("word"));
        }
    }

    /**
     * 统计词频的bolt
     */
    public static class CountBolt extends BaseRichBolt {

        private Map<String, Integer> map = new HashMap<String, Integer>();

        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        public void execute(Tuple input) {
            String word = input.getStringByField("word");

            Integer num = map.get(word);
            if (num == null) {
                num = 1;
            } else {
                num++;
            }

            map.put(word, num);

            System.out.println("~~~~~~~~~");
            Set<Map.Entry<String, Integer>> entries = map.entrySet();
            for (Map.Entry<String, Integer> entry : entries) {
                System.out.println(entry.getKey() + "出现的次数为:" + entry.getValue());
            }

        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

}

原文地址:https://www.cnblogs.com/flowyourheart/p/shi-yongstorm-fen-bie-jin-xing-ji-shu-he-ci-pin-to.html