Storm 提交多个流例程

1、拓扑(Topology):

builder.setBolt(TRANSFORM_BOLT, new TransformationBolt(), 1).shuffleGrouping(MY_SPOUT);
builder.setBolt(PROCESS1_BOLT, new FirstProcessBolt(), 1).shuffleGrouping(TRANSFORM_BOLT, "StreamOne");
builder.setBolt(PROCESS2_BOLT, new SecondProcessBolt(), 1).shuffleGrouping(TRANSFORM_BOLT, "StreamTwo");
builder.setBolt(PROCESS3_BOLT, new ThirdProcessBolt(), 1).shuffleGrouping(TRANSFORM_BOLT, "StreamThree");

2、Bolt:

@Override
private void execute(Tuple input) {
    // perform some logic 
    collector.emit("StreamOne", input, new Values(...));
    collector.emit("StreamTwo", input, new Values(...));
    collector.emit("StreamThree", input, new Values(...));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declareStream("StreamOne", new Fields(...));
    declarer.declareStream("StreamTwo", new Fields(...));
    declarer.declareStream("StreamThree", new Fields(...));
}
原文地址:https://www.cnblogs.com/zengdan-develpoer/p/4554708.html