Storm 实时读取本地文件操作(模拟分析网络日志)

  • WebLogProduct
    产生日志类
package top.wintp.weblog;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Date;
import java.util.Random;
import java.util.UUID;

/**
 * @description: description:生成网络日志
 * <p>
 * @author: upuptop
 * <p>
 * @qq: 337081267
 * <p>
 * @CSDN: http://blog.csdn.net/pyfysf
 * <p>
 * @cnblogs: http://www.cnblogs.com/upuptop
 * <p>
 * @blog: http://wintp.top
 * <p>
 * @email: pyfysf@163.com
 * <p>
 * @time: 2019/06/2019/6/5
 * <p>
 */
public class WebLogProduct {
    public static void main(String[] args) {
        //网站
        String[] webUrl = {
                "http://www.wintp.top",
                "http://upuptop.top",
                "http://github.com",
                "http://gitee.com"};
        //用户会话id
        String[] userSessionId = {
                "F5CC242E006B4A81BDE72E03BC7BD34D",
                "FA8C3631CE024ED5B98A65EE7F2600E2",
                "52EF144D16C24AC0912003539654824A",
                "F78F115B613A495F961B194EB2A377C6"};

        String[] scanDate = {
                "2019-6-5 23:29:00",
                "2019-4-5 22:39:20",
                "2019-7-5 21:09:05",
                "2019-8-5 20:19:08"
        };

        StringBuilder sb = new StringBuilder();

        for (int i = 0; i < 30; i++) {
            int randomIndex = new Random().nextInt(3);

            sb.append(webUrl[randomIndex])
                    .append("	")
                    .append(userSessionId[randomIndex])
                    .append("	")
                    .append(scanDate[randomIndex])
                    .append("
");

        }


        FileOutputStream fos = null;
        try {
            fos = new FileOutputStream("E:/weblog.log");
            fos.write(sb.toString().getBytes());

            System.out.println("write weblog.log success");
        } catch (FileNotFoundException e) {
            e.printStackTrace();
            System.out.println("write weblog.log fail");

        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("write weblog.log fail");

        } finally {
            if (fos != null) {
                try {
                    fos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }

}

  • WebLogSpout

    读取日志类

package top.wintp.weblog;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

/**
 * @description: description:
 * <p>
 * @author: upuptop
 * <p>
 * @qq: 337081267
 * <p>
 * @CSDN: http://blog.csdn.net/pyfysf
 * <p>
 * @cnblogs: http://www.cnblogs.com/upuptop
 * <p>
 * @blog: http://wintp.top
 * <p>
 * @email: pyfysf@163.com
 * <p>
 * @time: 2019/06/2019/6/5
 * <p>
 */
public class WebLogSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;
    private BufferedReader mReader;

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        //向下写出数据使用
        this.collector = spoutOutputCollector;
        //    初始化方法 读取文件
        try {
            mReader = new BufferedReader(new FileReader("E:/weblog.log"));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

    }

    String log = null;

    public void nextTuple() {
        //循环调用该方法 在这里面读取数据
        try {
            while (null != (log = mReader.readLine())) {
                //写出数据
                this.collector.emit(new Values(log));
            }

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //写出数据的名称
        outputFieldsDeclarer.declare(new Fields("log"));
    }
}

  • WebLogBlot
    处理日志类
package top.wintp.weblog;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebLogBlot extends BaseBasicBolt {
    private static final Logger logger = LoggerFactory.getLogger(WebLogBlot.class);
    /**
     * 统计数据个数
     */
    private int count;

    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        count++;

        //    处理数据的方法
        //http://www.wintp.top	F5CC242E006B4A81BDE72E03BC7BD34D	2019-6-5 23:29:00
        String log = tuple.getStringByField("log");

        String[] split = log.split("	");
        String webUrl = split[0];
        String userSessionId = split[1];

        logger.error("WebLogBlot  execute()   sessionId:{},ThreadId:{},webUrl:{},count:{}   ", userSessionId, Thread.currentThread().getId(), webUrl, count);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //不需要在向下写出数据
    }
}

  • WebLogDevice
    提交拓扑测试类
package top.wintp.weblog;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;

public class WebLogDevice {
    public static void main(String[] args) {
        //    创建拓扑
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //    设置spout bolt
        topologyBuilder.setSpout("WebLogSpout", new WebLogSpout(), 1);
        topologyBuilder.setBolt("WebLogBolt", new WebLogBlot(), 1).shuffleGrouping("WebLogSpout");

        //    获取配置
        Config config = new Config();
        //    设置workers
        config.setNumWorkers(1);

        //    提交拓扑
        if (args.length > 0) {
            //    集群提交
            try {
                StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            } catch (AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            //    本地提交
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("WebLogTopology", config, topologyBuilder.createTopology());
        }

    }

}

  • 控制台打印结果

在这里插入图片描述

不关闭程序,修改日志文件(注意分割的格式使用 )控制台打印结果追加

在这里插入图片描述

原文地址:https://www.cnblogs.com/shaofeer/p/11154275.html