storm集群搭建和java应用

1. vim /etc/hosts
ssh免密登录
192.168.132.154 c0
192.168.132.156 c1
192.168.132.155 c2

storm集群:
192.168.132.154 c0
192.168.132.156 c1
192.168.132.155 c2

2. 请在官网下载,并解压。文末有文件下载地址
tar -zxvf apache-storm-1.2.3.tar.gz

  配置环境变量(c0,c1,c2都要修改)

vim /etc/profile
# storm
export STORM_HOME=/home/xiaozw/soft/java/storm
export PATH=$PATH:${JAVA_PATH}:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZK_HOME/bin:$SPARK_HOME/bin:${STORM_HOME}/bin
刷新生效
source /etc/profile

3. 修改storm/conf/storm.yaml配置文件

storm.zookeeper.servers:

- "192.168.132.154"
- "192.168.132.156"
- "192.168.132.155"
nimbus.seeds: ["192.168.132.154"]
storm.local.dir:"/home/xiaozw/soft/tmp/storm"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703

  

创建storm目录(c0,c1,c2都要创建)

mkdir /home/xiaozw/soft/tmp/storm

4. 拷贝storm文件夹到其它集群上(c1,c2)
scp -r /home/xiaozw/soft/java/storm root@c1:/home/xiaozw/soft/java/
scp -r /home/xiaozw/soft/java/storm root@c2:/home/xiaozw/soft/java/

先启动zookeeper,安装zookeeper请查看其它文档。
zkServer.sh start
5. 启动storm
c0,nimbus上启动
./storm nimbus >> /dev/null 2>&1 &
启动界面
storm ui >> /dev/null 2>&1 &

  c1,c2 supervisor上启动

./storm supervisor >> /dev/null 2>&1 &

 界面查看,http://192.168.132.154:8080

 6. java wordcount程序。

package com.xiaozw.demo4.storm;


import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

public class WordCountTopology {
    /**
     * spout 继承一个基类,实现接口,这个里面主要是负责从数据源获取数据。
     * 简化从内部发射数据。
     */
    public static class RandomSentenceSpout extends BaseRichSpout{

        private static final long serialVersionUID = -8017609899644290351L;

        private SpoutOutputCollector collector;

        private Random random;

        /**
         * 对spout初始化,创建线程,数据库连接
         * @param conf
         * @param topologyContext
         * @param collector
         */
        @Override
        public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) {
            //初始化数据,SpoutOutputCollector用来发射数据出去,
            this.collector=collector;
            this.random=new Random();
        }

        /**
         * 最终运行在task中,某个worker进程的某个executor线程内部。
         * 某个task负责无限循环调用nextTuple方法,
         * 形成数据流。
         */
        @Override
        public void nextTuple() {
            Utils.sleep(100);
            String[] sentences = new String[] {
                    "the cow jumped over the moon", "an apple a day keeps the doctor away",
                    "four score and seven years ago", "snow white and the seven dwarfs",
                    "i am at two with nature" };
            String sentence = sentences[random.nextInt(sentences.length)];
            System.err.println("【发射句子】sentence=" + sentence);
            collector.emit(new Values(sentence));
        }

        /**
         * 发射出去每个tuple中field名称是什么。
         * @param declarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
        }
    }

    /**
     * 每个bolt同样是发送到worker某个executor的task中执行
     *
     * @author Administrator
     *
     */
    public static class SplitSentence extends BaseRichBolt {

        private static final long serialVersionUID = -1863792429350238883L;

        private OutputCollector collector;

        /**
         * 对于bolt来说,第一个方法就是prepare()方法。
         */
        @Override
        @SuppressWarnings("rawtypes")
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        /**
         * 每接收到一条数据后,就会交给executor方法来执行
         */
        @Override
        public void execute(Tuple tuple) {
            String sentence = tuple.getStringByField("sentence");
            if (sentence != null && "".equals(sentence) == false) {
                String[] words = sentence.split(" ");
                for (String word : words) {
                    collector.emit(new Values(word));
                }
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

    }

    /**
     * 单词计数bolt
     *
     * @author Administrator
     *
     */
    public static class WordCount extends BaseRichBolt {

        private static final long serialVersionUID = -8940950046975910504L;

        //private static final Logger LOGGER = LoggerFactory.getLogger(WordCount.class);

        private OutputCollector collector;
        private Map<String, Integer> wordCounts = new HashMap<String, Integer>();

        @Override
        @SuppressWarnings("rawtypes")
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple input) {
            String word = input.getStringByField("word");
            Integer count = wordCounts.get(word);
            if (count == null) {
                count = 0;
            }
            wordCounts.put(word, ++count);
            System.err.println("【单词计数】" + word + "出现的次数是" + count);
            collector.emit(new Values(word, count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }

    }

    public static void main(String[] args) throws Exception{
        // 在main方法中,会去将spout和bolts组合起来,构建成一个拓扑
        TopologyBuilder builder = new TopologyBuilder();

        // 第一个参数的意思,就是给这个spout设置一个名字
        // 第二个参数的意思,就是创建一个spout的对象
        // 第三个参数的意思,就是设置spout的executor有几个
        builder.setSpout("RandomSentence", new RandomSentenceSpout(), 5);

        builder.setBolt("SplitSentence", new SplitSentence(), 10).setNumTasks(20)
                .shuffleGrouping("RandomSentence");
        // 这里设置fieldsGrouping很重要,相同的单词从SplitSentence发射出来时,一定会进入到下游的指定的同一个task中
        // 只有这样子,才能准确的统计出每个单词的数量
        builder.setBolt("WordCount", new WordCount(), 10).setNumTasks(20).fieldsGrouping(
                "SplitSentence", new Fields("word"));

        Config config = new Config();
        if(args!=null && args.length>0){
            config.setNumWorkers(3);
            try{
                StormSubmitter.submitTopologyWithProgressBar(args[0],config,builder.createTopology());
            }
            catch (Exception e){
                e.printStackTrace();
            }
        }
        else{
            config.setMaxTaskParallelism(20);
            // 在eclipse本地运行
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("WordCountTopology", config, builder.createTopology());
            Utils.sleep(60000);
            cluster.shutdown();
        }
    }
}

打包 mvn clean install -DskipTests
上传jar包到测试服务器

  

  执行命令:

storm jar demo4-1.0-SNAPSHOT.jar com.xiaozw.demo4.storm.WordCountTopology WordCountTopology

  查看UI界面 http://192.168.132.154:8080/index.html

 

 7. 源码和文件下载地址

链接:https://pan.baidu.com/s/1RmBlhZ_p-30clHoUhxycBg
提取码:ik6g

  

原文地址:https://www.cnblogs.com/xiaozw/p/11904641.html