Storm系列(十)聚流示例

功能:将多个数据源的数据汇集到一个处理单元进行集中分类处理;

入口类TestMain

public class TestMain {
 
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("random1", new RandomWordSpout1(), 1);
        builder.setSpout("random2", new RandomWordSpout2(), 1);
        builder.setSpout("random3", new RandomWordSpout3(), 1);
        builder.setBolt("", new TransferBolt(), 1)
                .localOrShuffleGrouping("random1", "stream1")
10                  .localOrShuffleGrouping("random2", "stream2")
11                  .localOrShuffleGrouping("random3", "stream3");
12   
13          Config conf = new Config();
14          conf.setDebug(false);
15          conf.setNumWorkers(1);
16          LocalCluster cluster = new LocalCluster();
17          cluster.submitTopology("test-1", conf, builder.createTopology());
18      }
19  }

数据源类RandomWordSpout1 输出字段为name

public class RandomWordSpout1 extends BaseRichSpout {
 
    private static final long serialVersionUID = -4287209449750623371L;
 
    private SpoutOutputCollector collector;
 
    @Override
    public void open(@SuppressWarnings("rawtypes") Map conf,
            TopologyContext context, SpoutOutputCollector collector) {
10          this.collector = collector;
11      }
12   
13      @Override
14      public void declareOutputFields(OutputFieldsDeclarer declarer) {
15          declarer.declareStream("stream1", new Fields("name"));
16      }
17   
18      @Override
19      public void nextTuple() {
20          collector.emit("stream1", new Values("RandomWordSpout1"));
21      }
22   
23  }

数据源类RandomWordSpout2 输出字段为content

public class RandomWordSpout2 extends BaseRichSpout {
 
    private SpoutOutputCollector collector;
   
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
 
10      @Override
11      public void declareOutputFields(OutputFieldsDeclarer declarer) {
12          declarer.declareStream("stream2", new Fields("content"));
13      }
14   
15      @Override
16      public void nextTuple() {
17          collector.emit("stream2",new Values("RandomWordSpout2"));
18      }
19   
20  }

数据源类RandomWordSpout3输出key、value两个字段

public class RandomWordSpout3 extends BaseRichSpout {
 
    private SpoutOutputCollector collector;
 
    @Override
    public void open(@SuppressWarnings("rawtypes") Map conf,
            TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
10   
11      @Override
12      public void declareOutputFields(OutputFieldsDeclarer declarer) {
13          declarer.declareStream("stream3", new Fields("key", "value"));
14      }
15   
16      @Override
17      public void nextTuple() {
18          collector.emit("stream3", new Values("chenx","happyday"));
19         
20      }
21   
22  }

聚流处理类TransferBolt,输出从各流获取到的数据

public class TransferBolt extends BaseBasicBolt {
 
    private static final long serialVersionUID = 4223708336037089125L;
    private Map<String, Fields> _fieldMap = null;
 
    @Override
    public void prepare(@SuppressWarnings("rawtypes") Map stormConf,
            TopologyContext context) {
        _fieldMap = new HashMap<String, Fields>();
10          Set<GlobalStreamId> sourceSet = context.getThisSources().keySet();
11          for (GlobalStreamId source : sourceSet) {
12              Fields fields = context.getComponentOutputFields(
13                      source.get_componentId(), source.get_streamId());
14              _fieldMap.put(source.get_componentId() + source.get_streamId(),
15                      fields);
16          }
17   
18      }
19   
20      @Override
21      public void declareOutputFields(OutputFieldsDeclarer declarer) {
22      }
23   
24      @Override
25      public void execute(Tuple input, BasicOutputCollector collector) {
26          String key = input.getSourceComponent() + input.getSourceStreamId();
27          Fields fields = _fieldMap.get(key);
28          int size = fields.size();
29          String content = "";
30          for (int i = 0; i < size; i++) {
31              content += input.getStringByField(fields.get(i));
32          }
33          System.out.println("SourceComponent:" + input.getSourceComponent()
34                  + ",SourceStreamId:" + input.getSourceStreamId() + ",content:"
35                  + content);
36      }
37   
38  }
原文地址:https://www.cnblogs.com/jianyuan/p/4830839.html