MR框架-->ETL

  • ETL介绍和实现

1.ETL的介绍:

    在上面的案例中我们的操作虽然完成,但是我们发现一个问题,都是对一个166mb大小的文件操作,效率是非常低下的,ETL是用来描述:将数据从来源段经过抽取(extract)  转换(transform) 加载(load)  到目的端的过程。

2.ETL的案例

    针对日志文件,截取我们需要的字段,去除不必要的字段

public static class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {
            //读取一行日志
            String log = value.toString();
            //通过LogParser解析获取这些字段
            Map<String, String> map = new LogParser().parse(log); 
            String ip = map.get("ip");
            String url = map.get("url");
            String sessionId = map.get("sessionId");
            String  time = map.get("time");
            //通过IPParser获取国家省市
            RegionInfo analyseIp = IPParser.getInstance().analyseIp(ip);
            
            String counttry = analyseIp.getCountry() == null ? "-" : analyseIp.getCountry();
            String province = analyseIp.getProvince() == null ? "-" : analyseIp.getProvince();
            String city   =      analyseIp.getCity() == null ? "-" : analyseIp.getCity();
            //Buffer
            StringBuffer sb = new StringBuffer();
            sb.append(ip +"	");
            sb.append(url +"	");
            sb.append(sessionId +"	");
            sb.append(time +"	");
            sb.append(counttry +"	");
            sb.append(province +"	");
            sb.append(city);
            
            //写入上下文
            context.write(new Text(sb.toString()), NullWritable.get());
        }
    }

提交主类:

public static void main(String[] args) throws Exception {
        // 加载配置文件
        Configuration conf = new Configuration();
        //创建hdfs对象
        FileSystem fs = FileSystem.get(conf);
        //判断输出路径是否重复
        if(fs.exists(new Path(args[1]))) {
            fs.delete(new Path(args[1]),true);
        }
        // 创建Job对象
        Job job = Job.getInstance(conf);
        // 设置提交主类
        job.setJarByClass(ETLApp.class);
        // 设置Mapper类相关的参数
        job.setMapperClass(ETLMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 设置输入路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 设置输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 提交任务
        job.waitForCompletion(true);
    }
原文地址:https://www.cnblogs.com/wyk1/p/13955449.html