Storm drpc学习

示例代码:

package com.lky.test;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

/**
* @ClassName: manualDRPC
* @Description: storm 分布式RPC学习
* @author Administrator
* @date 2015年10月23日
* @version 1.0
 */
public class manualDRPC {

    private static Log log = LogFactory.getLog(manualDRPC.class);

    @SuppressWarnings("serial")
    public static class ExclamationBolt extends BaseBasicBolt {
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("result", "return-info"));
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String arg = tuple.getString(0);
            Object retInfo = tuple.getValue(1);
            log.info("execute----------->"+arg+"--------"+retInfo);
            collector.emit(new Values(arg + "!!!", retInfo));
        }
    }
    
    @Test
    public void test(){
        TopologyBuilder builder=new TopologyBuilder();
        LocalDRPC drpc=new LocalDRPC();//本地RPC
        
        //构建topology
        DRPCSpout drpcSpout=new DRPCSpout("exclation",drpc);
        builder.setSpout("drpc", drpcSpout,2);
        builder.setBolt("exam", new ExclamationBolt(),4).shuffleGrouping("drpc");
        builder.setBolt("return", new ReturnResults(),4).shuffleGrouping("exam");
        
        //配置topology
        Config config=new Config();
        config.setDebug(true);
        config.setMaxSpoutPending(1000);
        config.setNumWorkers(2);
        
        //本地集群
        LocalCluster localCluster=new LocalCluster();
        localCluster.submitTopology("test", config, builder.createTopology());
        
        //本地和storm交互
        System.out.println("-------------"+drpc.execute("exclation", "luo")+"-------------");
        System.out.println("-------------"+drpc.execute("exclation", "lky")+"-------------");
        
        
        Utils.sleep(1000*10);
        drpc.shutdown();
        localCluster.killTopology("test");
        localCluster.shutdown();
    }

}
package com.lky.test;

import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

/**
 * @ClassName: drpcTopology
 * @Description:线性drpc学习
 * @author Administrator
 * @date 2015年10月23日
 *
 */
public class drpcTopology {

    @SuppressWarnings("serial")
    public static class Exclation extends BaseRichBolt {
        private static Log log = LogFactory.getLog(Exclation.class);
        private OutputCollector _outputCollector;

        @Override
        public void execute(Tuple tuple) {
            String res = null;
            try {
                res = tuple.getString(1);
                if (null != res) {
                    _outputCollector.emit(new Values(tuple.getValue(0), res + "!!!!"));
                    log.info("execute 发射消息-------->" + res);
                }

            } catch (Exception e) {
                log.error("execute处理异常!!!");
            }
        }

        @Override
        public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector outputCollector) {
            this._outputCollector = outputCollector;
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "result"));
        }
    }

    @SuppressWarnings("deprecation")
    @Test
    public void testDRPC() {
        // LinearDRPCTopologyBuilder帮助我们自动集成了DRPCSpout和returnResults(bolt)
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("text");
        builder.addBolt(new Exclation(), 3);

        Config config = new Config();
        LocalDRPC drpc = new LocalDRPC();

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("drpc-test", config, builder.createLocalTopology(drpc));

        for (String word : new String[] { "hello", "word" }) {
            System.out.println("result "" + word + "": " + drpc.execute("text", word));
        }

        Utils.sleep(1000 * 5);
        drpc.shutdown();
        cluster.killTopology("drpc-test");
        cluster.shutdown();
    }

}
原文地址:https://www.cnblogs.com/dmir/p/4905644.html