Storm-WordCount示例

1、定义topology:

public class TopologyMain {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
//spout
WordCountSpout wcs = new WordCountSpout();
//bolts
WordGroupBolt wgb = new WordGroupBolt();
WordCountBolt wcb = new WordCountBolt();
ReportBolt rb = new ReportBolt();
//topology
TopologyBuilder tb = new TopologyBuilder();
tb.setSpout("word-spout", wcs);
tb.setBolt("group-bolt", wgb).shuffleGrouping("word-spout");
tb.setBolt("count-bolt", wcb).fieldsGrouping("group-bolt", new Fields("word"));
tb.setBolt("report-bolt", rb).globalGrouping("count-bolt");
//Config
Config config = new Config();
StormSubmitter.submitTopology("word-count-topology", config, tb.createTopology());
Thread.sleep(10000);
}
}

2、spout:

public class WordCountSpout extends BaseRichSpout {

private static final long serialVersionUID = 1L;
//模拟数据源,每一个发出句子作为一个元组
private String[] wordArr = {"my dog has fleas","i like cold beverages","the dog ate my homework"};
private SpoutOutputCollector collector;
private int index = 0;

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {//spout初始化时调用
this.collector = collector;
}

@Override
public void nextTuple() {//输出元组
this.collector.emit(new Values(wordArr[index]));
index ++;
if(index >= wordArr.length){
index = 0;
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));//设置每个发出的元组包含的字段,这里只包含一个word字段
}

}

3、bolt:

public class WordGroupBolt extends BaseRichBolt {

private static final long serialVersionUID = 1L;
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
String[] words = word.split(" ");
for(String w : words){
this.collector.emit(new Values(w));
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));//对应spout中定义的字段
}

}

public class WordCountBolt extends BaseRichBolt {

private static final long serialVersionUID = 1L;
private OutputCollector collector;
private HashMap<String, Long> countMap = null;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.countMap = new HashMap<String, Long>();
}

@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Long count = this.countMap.get(word);
if(null == count){
count = 0L;
}
count ++;
this.countMap.put(word, count);
this.collector.emit(new Values(word, count));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}

}

public class ReportBolt extends BaseRichBolt {

private static final long serialVersionUID = 1L;
private HashMap<String, Long> countMap = null;
private static final Logger logger = LoggerFactory.getLogger(ReportBolt.class);

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.countMap = new HashMap<String, Long>();
}

@Override
public void execute(Tuple input) {
logger.info("ReportBolt print result:");
String word = input.getStringByField("word");
Long count = input.getLongByField("count");
this.countMap.put(word, count);
logger.info(word + "-----------------" + count);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

@Override
public void cleanup() {
super.cleanup();
}

}

查看日志,可通过UI查询,如ip:8080可以看到提交的的topology名称,点进去,点端口号可以查看之日

也可以登录到对应工作节点的storm安装目录下logs文件夹下,每个端口号都会生成一个log文件,查看对应端口号的日志文件即可查看打印信息

原文地址:https://www.cnblogs.com/zhli/p/4820185.html