storm WC 2 (根据日志)

1.产生虚拟日志

package les7.readFileTopo;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;

public class GetData {

    /**
     * @param args
     */
    public static void main(String[] args) {
        File logFile = new File("track.log");
        Random random = new Random();

        String[] hosts = { "movie information" };
        String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U12", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
        String[] time = { "2019-03-07 08:40:50", "2019-03-07 08:40:51", "2019-03-07 08:40:52", "2019-03-07 08:40:53",
                "2019-03-07 09:40:49", "2019-03-07 10:40:49", "2019-03-07 11:40:49", "2019-03-07 12:40:49" };
        
        StringBuffer sbBuffer = new StringBuffer() ;
        for (int i = 0; i < 5000; i++) {
            sbBuffer.append(hosts[0]+"	"+session_id[random.nextInt(5)]+"	"+time[random.nextInt(8)]+"
");
        }
        if(! logFile.exists())
        {
            try {
                logFile.createNewFile();
            } catch (IOException e) {
                System.out.println("Create logFile fail !");
            }
        }
        byte[] b = (sbBuffer.toString()).getBytes();
        
        FileOutputStream fs;
        try {
            fs = new FileOutputStream(logFile);
            fs.write(b);
            fs.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
View Code

2.spout自定义数据流入拓扑逻辑

package les7.readFileTopo;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class ReadFileSpout implements IRichSpout{

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    FileInputStream fis;
    InputStreamReader isr;
    BufferedReader br;            

    SpoutOutputCollector collector = null;
    
    
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        // 初始化方法
        try {
            this.collector = collector;
            this.fis = new FileInputStream("track.log");
            this.isr = new InputStreamReader(fis, "UTF-8");
            this.br = new BufferedReader(isr);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() {
        // TODO 关闭Topo
        
    }

    @Override
    public void activate() {
        // TODO 激活Topo
        
    }

    @Override
    public void deactivate() {
        // TODO 停用Topo
        
    }
    String str = null;
    String[] str01=null;
    @Override
    public void nextTuple() {
        // TODO 核心方法,死循环,获取外部Touple,emit到下一级组件
        try {
            while ((str = this.br.readLine()) != null) {
                //                // 过滤动作
                //
                                str01=str.split("	");
                                collector.emit(new Values(str));
                //
                                Thread.sleep(3);
                //                //to do
            }
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

    @Override
    public void ack(Object msgId) {
        // TODO 如果开启Acker,成功执行Tuple后会回调该4方法,告知Storm框架该Tuple已经被成功执行。
        
    }

    @Override
    public void fail(Object msgId) {
        // TODO 如果开启Acker,当失败执行Tuple后会回调该方法,告知Storm框架该Tuple已经被执行失败。
        // 以便我们手工编码实现失败重发,并控制重发次数。
        
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO 定义输出的列名
        declarer.declare(new Fields("log"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO 可以在代码里设置一下属性。该方法基本是废弃不用的。
        return null;
    }
    

}
View Code

3.bolt处理逻辑

package les7.readFileTopo;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class FileBolt implements IRichBolt {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    OutputCollector collector = null;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO 初始化函数

        this.collector = collector;
    }

    Integer num = 0;
    String[] words;
    @Override
    public void execute(Tuple tuple) {
        // TODO 死循环,核心方法,处理业务逻辑
        String value =tuple.getString(0);
        //分词
        String[] words = value.split("	");

        //输出
        for(String w:words){
            collector.emit(new Values(w,1));
        }

        }


    @Override
    public void cleanup() {
        // TODO 销毁方法,基本不用4

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO 定义输出列名

        declarer.declare(new Fields("word","count"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

}
View Code

4.bolt输出逻辑

package les7.readFileTopo;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import static org.apache.htrace.Tracer.LOG;

public class PrintBolt implements IRichBolt {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private Map<String, Integer> result = new HashMap<>();
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;

    }

    @Override
    public void execute(Tuple tuple) {

        String word = tuple.getStringByField("word");
        int count = tuple.getIntegerByField("count");

        //求和
        if(result.containsKey(word)){
            //如果已经存在,累加
            int total = result.get(word);
            result.put(word, total+count);
        }else{
            //这是一个新单词
            result.put(word, count);
        }

        //输出到屏幕
        System.out.println("统计的结果是:" + result);

        //输出给下一个组件                                               单词           总频率
        this.collector.emit(new Values(word,result.get(word)));
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

}
View Code

5.书写拓扑逻辑代码

package les7.readFileTopo;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;

//展示信息数据

public class FileCountTopo {

    public static void main(String[] args) {
        // TODO Auto-generated method stub

        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("spout", new ReadFileSpout(),1) ;
        builder.setBolt("b1", new FileBolt(),2).shuffleGrouping("spout");
        builder.setBolt("PrintBolt", new PrintBolt(),1).shuffleGrouping("b1");
        
        Config conf = new Config();
        conf.setDebug(true);

        if (args.length > 0) {
            try {
                //提交到集群
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else {
            //本地模式提交
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("mytopology", conf, builder.createTopology());
        }
        
        
        
        
    }

}
View Code
RUSH B
原文地址:https://www.cnblogs.com/tangsonghuai/p/11169032.html