通过hbase实现日志的转存(MR AnalyserLogDataRunner和AnalyserLogDataMapper)

操作代码(提前启动集群(start-all.sh)、zookeeper(zkServer.sh start)、启动历史任务服务器(mr-jobhistory-daemon.sh start historyserver)、hbase(start-hbase.sh start)

然后在hbase中创建表

create 'eventlog','log';

AnalyserLogDataRunner类

下边内容有可能会报错,添加如下两句

configuration.set("hbase.master", "master:60000");
configuration.set("hbase.zookeeper.property.clientPort", "2181");

 

 

       

   

 

       

       

获取输入路径,下面这样设置也可以,表现形式不同而已

AnalyserLogDataMapper类

 

 

}

上述生成的主键是很长的,经过crc32使得他们不至于那么长

package com.yjsj.etl.mr;

import com.yjsj.common.EventLogConstants;
import com.yjsj.common.GlobalConstants;
import com.yjsj.util.TimeUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//import java.util.logging.Logger;
import org.apache.log4j.Logger;

import java.io.IOException;


public class AnalyserLogDataRunner implements Tool {
    //public static final Logger log=Logger.getGlobal();
    public static final Logger log=Logger.getLogger(AnalyserLogDataRunner.class);
    //注意这次用的是log4j的日志
    private Configuration conf=null;

    public static void main(String[] args) {
        try {
            ToolRunner.run(new Configuration(),new AnalyserLogDataRunner(),args);
        } catch (Exception e) {
            log.error("执行日志解析job异常",e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    @Override
    public void setConf(Configuration configuration) {

        configuration.set("hbase.zookeeper.quorum", "master,node1,node2");
        configuration.set("fs.defaultFS","hdfs://master:9000");
        configuration.set("hbase.master", "master:60000");
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        this.conf=HBaseConfiguration.create(configuration);

    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf=this.getConf();
        this.processArgs(conf,args);
        Job job=Job.getInstance(conf,"analyser_logdata");
        //设置本地提交job,集群运行,需要代码
        //File jarFile=EJob.createTempJar("target/classes");
        //((JobCong) job.getConfiguration()).setJar(jarFile.toString());
        //设置本地提交,集群运行,需要代码结束
        job.setJarByClass(AnalyserLogDataRunner.class);
        job.setMapperClass(AnalyserLogDataMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Put.class);
        //设置reduce配置
        //1.集群上运行,打成jar运行(要求addDependencyJars参数为true,默认为true)
        //TableMapReduceUtil.initTableReduceJob(EventLogConstants.HBASE_NAME_EVENT_LOGS,null,job);
        //2、本地运行,要求参数为addDependencyJars为false
        TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS,null,job,null,null,null,null,false);
        job.setNumReduceTasks(0);//上面红色是表名,封装的名为eventlog的值
        this.setJobInputPaths(job);
        return job.waitForCompletion(true)?0:-1;
    }
    private void setJobInputPaths(Job job){
        Configuration conf=job.getConfiguration();
        FileSystem fs=null;
        try {
            fs=FileSystem.get(conf);
            String date=conf.get(GlobalConstants.RUNNING_DATE_PARAMES);
            Path inputPath=new Path("/project/log/"+TimeUtil.parseLong2String(
                    TimeUtil.parseString2Long(date),"yyyyMMdd"
            )+"/");
            if (fs.exists(inputPath)){
                FileInputFormat.addInputPath(job,inputPath);
            }else {
                throw new RuntimeException("文件不存在:"+inputPath);
            }
            System.out.println("*******"+inputPath.toString());
        } catch (IOException e) {
            throw new RuntimeException("设置job的mapreduce输入路径出现异常",e);
        }finally {
            if (fs!=null){
                try {
                    fs.close();
                } catch (IOException e) {
                    //e.printStackTrace();
                }
            }
        }

    }
    private void processArgs(Configuration conf,String[] args){
        String date=null;
        for (int i=0;i<args.length;i++){
            if("-d".equals(args[i])){
                if (i+1<args.length){
                    date=args[++i];
                    break;
                }
            }
        }
        System.out.println("------"+date);
        //要求格式为yyyy-MM-dd
        //注意下面是org.apache.commons.lang包下面的
        if (StringUtils.isBlank(date)||!TimeUtil.isValidateRunningDate(date)){
            //date是一个无效数据
            date=TimeUtil.getYesterday();
            System.out.println(date);
        }
        conf.set(GlobalConstants.RUNNING_DATE_PARAMES,date);
    }
}
package com.yjsj.etl.mr;

import com.yjsj.common.EventLogConstants;
import com.yjsj.common.GlobalConstants;
import com.yjsj.etl.util.LoggerUtil;
import com.yjsj.util.TimeUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Map;
import java.util.zip.CRC32;


public class AnalyserLogDataMapper extends Mapper<LongWritable,Text,NullWritable,Put> {
private final Logger logger=Logger.getLogger(AnalyserLogDataMapper.class);
private int inputRecords,filterRecords,outputRecords;//用于标志,方便查看过滤数据
private byte[] family=Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME);
private CRC32 crc32=new CRC32();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        this.inputRecords++;
        this.logger.debug("Analyse data of:"+value);
        try {
            //解析日志
            Map<String,String> clientInfo=LoggerUtil.handleLog(value.toString());
            //过滤解析失败的日志
            if (clientInfo.isEmpty()){
                this.filterRecords++;
                return;
            }
            String eventAliasName =clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME);
            EventLogConstants.EventEnum event= EventLogConstants.EventEnum.valueOfAlias(eventAliasName);
            switch (event){
                case LAUNCH:
                case PAGEVIEW:
                case CHARGEREQUEST:
                case CHARGEREFUND:
                case CHARGESUCCESS:
                case EVENT:
                    //处理数据
                    this.handleData(clientInfo,event,context);
                    break;
                default:
                    this.filterRecords++;
                    this.logger.warn("该事件无法解析,事件名称为"+eventAliasName);
            }
        } catch (Exception e) {
            this.filterRecords++;
            this.logger.error("处理数据发出异常,数据为"+value,e);
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        super.cleanup(context);
        logger.info("输入数据:"+this.inputRecords+"输出数据"+this.outputRecords+"过滤数据"+this.filterRecords);
    }
    private void handleData(Map<String,String> clientInfo, EventLogConstants.EventEnum event,Context context)
    throws IOException,InterruptedException{
        String uuid=clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID);
        String memberId=clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID);
        String serverTime=clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME);
        if (StringUtils.isNotBlank(serverTime)){
            //要求服务器时间不为空
            clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT);//去掉浏览器信息
            String rowkey=this.generateRowKey(uuid,memberId,event.alias,serverTime);//timestamp
            Put put=new Put(Bytes.toBytes(rowkey));
            for (Map.Entry<String,String> entry:clientInfo.entrySet()){
                if (StringUtils.isNotBlank(entry.getKey())&&StringUtils.isNotBlank(entry.getValue())){
                    put.add(family,Bytes.toBytes(entry.getKey()),Bytes.toBytes(entry.getValue()));
                }
            }
            context.write(NullWritable.get(),put);
            this.outputRecords++;
        }else {
            this.filterRecords++;
        }
    }
    private String generateRowKey(String uuid,String memberId,String eventAliasName,String serverTime){
        StringBuilder sb=new StringBuilder();
        sb.append(serverTime).append("_");
        this.crc32.reset();
        if (StringUtils.isNotBlank(uuid)){
            this.crc32.update(uuid.getBytes());
        }
        if (StringUtils.isNotBlank(memberId)){
            this.crc32.update(memberId.getBytes());
        }
        this.crc32.update(eventAliasName.getBytes());
        sb.append(this.crc32.getValue()%100000000L);
        return sb.toString();
    }
}
原文地址:https://www.cnblogs.com/pursue339/p/10655446.html