Storm的开发使用

一、开发

* 假定是用IDEA工具开发,这里实现的是上面(2)类型的2层Bolt实例,Spout -> Bolt1 -> Bolt2

1.创建Maven项目

项目名是StormProcessor,包名是com.clotho.storm。后面运行命令时会用到。

2.配置Maven

在pom.xml的<dependencies>和</dependencies>中间加入以下内容:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
    <!--<scope>${provided.scope}</scope>-->
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-client</artifactId>
    <version>2.3.0</version>
</dependency>

* 如果pom.xml有其它Hadoop组件的引用,运行时有可能会报错。例如:

“Detected both log4j-over-slf4j.jar AND bound slf4j-log4j12.jar on the class path”

遇到这种情况,有2个选择:

a.独立一个项目开发Storm程序

b.exclude掉引发异常的依赖项(在pom.xml编辑界面:右键 -> Maven -> Show Dependencies -> 点击Show Conflicts/Duplicates按钮查看)

3.Spout类

可以实现IRichSpout接口,也可以扩展BaseRichSpout类,BaseRichSpout类也是实现IRichSpout接口

Spout一般是负责获取数据或生成数据,并发送给下一层Bolt处理。

public class TextSpout extends BaseRichSpout
{
    private SpoutOutputCollector collector;
    private static final String field = "text";
    private int count = 0;
    private String[] message = {
            "这是 第 1 条 信息 ~ 统计",
            "这是 第 2 条 信息 ~ 统计 测试",
            "这是 第 3 条 信息 ~ 测试"
    };

    /**
     * 在Spout初始化时被调用
     *
     * @param map
     * @param context
     * @param collector
     */
    @Override
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector)
    {
        System.out.println("open:" + map.get("test"));
        this.collector = collector;
    }

    /**
     * Spout实现的核心方法,会被循环调用
     */
    @Override
    public void nextTuple()
    {
        if (count < message.length)
        {
            System.out.println("第" + count + "次开始发送数据..");
            collector.emit(new Values(message[count]));
        }
        count++;
    }

    /**
     * 声明要输出的Tuple的字段名称
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
        System.out.println("定义格式...");
        declarer.declare(new Fields(field));
    }
}

4.第1层的Bolt类

可以实现IRichBolt接口,也可以扩展BaseRichBolt类,BaseRichBolt类也是实现IRichBolt接口

这层Bolt是负责把前面传过来的文本进行空格分词,并传递给下一层Bolt

public class SplitBolt extends BaseRichBolt
{
    private OutputCollector collector;
    private static final String field1 = "text";
    private static final String field2 = "count";

    /**
     * 在Bolt启动前执行,提供Bolt启动环境配置的入口
     */
    @Override
    public void prepare(Map map, TopologyContext context, OutputCollector collector)
    {
        System.out.println("prepare:" + map.get("test"));
        this.collector = collector;
    }

    /**
     * Bolt实现的核心方法
     */
    @Override
    public void execute(Tuple tuple)
    {
        String message = tuple.getStringByField(field1);
        System.out.println("开始分割单词:" + message);
        String[] words = message.toLowerCase().split(" ");
        for (String word : words)
        {
            collector.emit(new Values(word)); //向下一个Bolt发送数据
        }
    }

    /**
     * 声明数据格式
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
        declarer.declare(new Fields(field2));
    }

    /**
     * Storm在终止一个Bolt之前会调用这个方法
     */
    @Override
    public void cleanup()
    {
        System.out.println("TestBolt的资源释放");
    }
}

5.第2层的Bolt类

这层Bolt是负责对上一层传来的词进行统计

public class StatBolt extends BaseRichBolt
{
    private static final String field = "count";
    private int count = 1;

    /**
     * 保存单词和对应的计数
     */
    private HashMap<String, Integer> counts = null;

    /**
     * @param map
     * @param context
     * @param collector
     */
    @Override
    public void prepare(Map map, TopologyContext context, OutputCollector collector)
    {
        System.out.println("prepare:" + map.get("test"));
        counts = new HashMap<String, Integer>();
    }

    /**
     * @param tuple
     */
    @Override
    public void execute(Tuple tuple)
    {
        String message = tuple.getStringByField(field);
        System.out.println("第" + count + "次统计单词出现的次数");
        if (!counts.containsKey(message))
        {
            counts.put(message, 1);
        }
        else
        {
            counts.put(message, counts.get(message) + 1);
        }
        count++;
    }

    /**
     *
     */
    @Override
    public void cleanup()
    {
        System.out.println("===========开始显示单词数量============");
        for (Map.Entry<String, Integer> entry : counts.entrySet())
        {
            System.out.println(entry.getKey() + ": " + entry.getValue());
        }
        System.out.println("===========结束============");
        System.out.println("Test2Bolt的资源释放");
    }

    /**
     * 声明数据格式
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
    }
}

6.Topology类

负责绑定Spout和Bolt1、Bolt2,并提交给Storm

public class CountBolt extends BaseRichBolt
{
    private static final String field = "count";
    private int count = 1;

    /**
     * 保存单词和对应的计数
     */
    private HashMap<String, Integer> counts = null;

    /**
     * @param map
     * @param context
     * @param collector
     */
    @Override
    public void prepare(Map map, TopologyContext context, OutputCollector collector)
    {
        System.out.println("prepare:" + map.get("test"));
        counts = new HashMap<String, Integer>();
    }

    /**
     * @param tuple
     */
    @Override
    public void execute(Tuple tuple)
    {
        String message = tuple.getStringByField(field);
        System.out.println("第" + count + "次统计单词出现的次数");
        if (!counts.containsKey(message))
        {
            counts.put(message, 1);
        }
        else
        {
            counts.put(message, counts.get(message) + 1);
        }
        count++;
    }

    /**
     *
     */
    @Override
    public void cleanup()
    {
        System.out.println("===========开始显示单词数量============");
        for (Map.Entry<String, Integer> entry : counts.entrySet())
        {
            System.out.println(entry.getKey() + ": " + entry.getValue());
        }
        System.out.println("===========结束============");
    }

    /**
     * 声明数据格式
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
    }
}

二、启动

1.编译成jar文件并上传到服务器

假定项目在E:\Code\StormProcess\目录

cd e:\Code\StormProcess
mvn package

打包后,在E:\Code\StormProcess\target\目录下会有StormProcess-1.0.jar文件,把文件上传到服务器

2.用命令提交到Storm执行

(1) 有2种方式

a.使用代码中的命名作为拓扑名

storm jar StormProcess-1.0.jar com.clotho.storm.StatTopology

b.运行时在命令中指定拓扑名

storm jar StormProcess-1.0.jar com.clotho.storm.StatTopology WordCount

(2) 验证

storm list

* 也可以在Storm UI查看

(3) 删除拓扑(Topology)

storm kill StormProcess

* 也可以在Storm UI删除,进入具体拓扑(Topology),点击Topology actions的kill按钮

附录:

Storm的原理和机制

https://www.cnblogs.com/live41/p/15560493.html

Storm的安装与部署

https://www.cnblogs.com/live41/p/15555719.html

原文地址:https://www.cnblogs.com/live41/p/15563263.html