13.MapReduce应用

一、Join多种应用

1.1 Reduce Join

Reduce Join工作原理:

Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段打标志)分开,最后进行合并即可。

案例:

需求: 将商品信息表中数据根据商品pid合并到订单数据表中
order.txt:

id pid amount
1001 01 1
1002 02 2
1003 03 3
1004 01 4
pd.txt:
pid pname
-- --
01 小米
02 华为
03 联想
期望获得数据:
id pname amount
-- -- --
1001 小米 1
1004 小米 4
1002 华为 2
1003 格力 3

代码实现:

OrderBean实体:

public class OrderBean implements WritableComparable<OrderBean> {

    private String id;
    private String pid;
    private int amount;
    private String pname;

    public OrderBean() {
    }


    @Override
    public int compareTo(OrderBean o) {
        int compare = this.pid.compareTo(o.pid);
        if (compare == 0) {
            return o.pname.compareTo(this.pname);
        } else {
            return compare;
        }
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.id = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
    }
    //省略getter、setter、toString方法
    ...
}

Mapper类

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
    private OrderBean orderBean = new OrderBean();
    private String fileName;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit fs = (FileSplit) context.getInputSplit();
        fileName = fs.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("	");
        //根据文件名来创建OrderBean对象
        if ("order.txt".equals(fileName)){
            orderBean.setId(fields[0]);
            orderBean.setPid(fields[1]);
            orderBean.setAmount(Integer.parseInt(fields[2]));
            orderBean.setPname("");
        }else {
            orderBean.setPid(fields[0]);
            orderBean.setPname(fields[1]);
            orderBean.setId("");
            orderBean.setAmount(0);
        }
        context.write(orderBean,NullWritable.get());
    }
}

Reducer类:

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        Iterator<NullWritable> vars = values.iterator();
        //指针下移获取第一个OrderBean
        vars.next();
        String pname = key.getPname();
        while (vars.hasNext()) {
            //指针下移,其对应的key也变化了
            vars.next();
            key.setPname(pname);
            context.write(key, NullWritable.get());
        }
    }
}

分组Comparator类:

public class OrderComparator extends WritableComparator {
    public OrderComparator() {
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean oa = (OrderBean) a;
        OrderBean ob = (OrderBean) b;
        return oa.getPid().compareTo(ob.getPid());
    }
}

驱动Driver

public class OrderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(OrderDriver.class);
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);

        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
        //设置分组Comparator
        job.setGroupingComparatorClass(OrderComparator.class);

        FileInputFormat.setInputPaths(job, new Path("D:\MyFile\test"));
        //指定_SUCCESS文件的位置
        FileOutputFormat.setOutputPath(job, new Path("d:\output"));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

缺点:
Reduce Join合并的操作是在Reduce阶段完成的,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

解决方案: 使用Map Join

1.2 Map Join

使用场景:

Map Join适用于一张表非常小、另一表非常大的场景。

Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据压力,尽可能的减少数据倾斜。

实现方式:

  1. DistributedCacheDriver缓存小文件
  2. MapsetUp()方法中读取缓存文件

代码:

Mapper类:

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
    private OrderBean orderBean = new OrderBean();
    private Map<String, String> pMap = new HashMap<>();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        URI[] cacheFiles = context.getCacheFiles();
        String path = cacheFiles[0].getPath();
        /**
         * 使用FSDataInputStream会中文乱码
         */
//        FileSystem fs = FileSystem.get(context.getConfiguration());
//        FSDataInputStream fis = fs.open(new Path(path));
        BufferedReader fis = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));
        String line;
        while (StringUtils.isNotEmpty(line = fis.readLine())) {
            String[] fields = line.split("	");
            pMap.put(fields[0], fields[1]);
        }
        IOUtils.closeStream(fis);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("	");
        String pname = pMap.get(fields[1]);
        pname = pname == null ? "" : pname;
        orderBean.setId(fields[0]);
        orderBean.setPid(fields[1]);
        orderBean.setAmount(Integer.parseInt(fields[2]));
        orderBean.setPname(pname);
        context.write(orderBean, NullWritable.get());
    }
}

驱动Driver

public class OrderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(OrderDriver.class);
        job.setMapperClass(OrderMapper.class);

        FileInputFormat.setInputPaths(job, new Path("D:\MyFile\test"));
        //指定_SUCCESS文件的位置
        FileOutputFormat.setOutputPath(job, new Path("d:\output"));

        //加载缓存数据
        job.addCacheFile(new URI("file:///d:/MyFile/cache/pd.txt"));
        //Map端Join的逻辑不需要Reduce阶段,设置ReduceTask数量为0
        job.setNumReduceTasks(0);


        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

二、计数器应用

Hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户监控已处理的输入数据量和已产生的输出数据量。

  1. 采用枚举的方式统计计数
enum MyCounter{MALFORORMED,NORMAL}
//对枚举定义的自定义计数器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
  1. 采用计数组、计数器名称的方式统计
context.getCounter("counterGroup","counter").increment(1);
  1. 计数结果在程序运行后的控制台上查看

三、数据清洗(ETL)

在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

需求: 去除日志中字段长度小于等于11的日志
在这里插入图片描述Mappper类:

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
	
	Text k = new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		
		// 1 获取1行数据
		String line = value.toString();
		
		// 2 解析日志
		boolean result = parseLog(line,context);
		
		// 3 日志不合法退出
		if (!result) {
			return;
		}
		
		// 4 设置key
		k.set(line);
		
		// 5 写出数据
		context.write(k, NullWritable.get());
	}

	// 2 解析日志
	private boolean parseLog(String line, Context context) {

		// 1 截取
		String[] fields = line.split(" ");
		
		// 2 日志长度大于11的为合法
		if (fields.length > 11) {

			// 系统计数器
			context.getCounter("map", "true").increment(1);
			return true;
		}else {
			context.getCounter("map", "false").increment(1);
			return false;
		}
	}
}

驱动Driver:

public class LogDriver {

	public static void main(String[] args) throws Exception {
		// 1 获取job信息
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		// 2 加载jar包
		job.setJarByClass(LogDriver.class);

		// 3 关联map
		job.setMapperClass(LogMapper.class);

		// 4 设置最终输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 设置reducetask个数为0
		job.setNumReduceTasks(0);

		// 5 设置输入和输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 6 提交
		job.waitForCompletion(true);
	}
}

计数效果:
在这里插入图片描述

四、MapReduce开发总结

在编写MapReduce程序时,需要考虑的几个方面:

①输入数据接口:InputFormat

  1. 默认使用的实现类是:TextInputFormat
  2. TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回
  3. KeyValueTextInputFormat每一行均为一条记录,被分隔符分割为keyvalue。默认分隔符是tab ( )
  4. NlineInputFormat按照指定的行数N来划分切片。
  5. CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率
  6. 用户还可以自定义InputFormat

②逻辑处理接口:Mapper

用户根据业务需求实现其中三个方法:map()setup()cleanup ()

Partitioner分区

有默认实现HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces

如果业务上有特别的需求,可以自定义分区。

Comparable排序

当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法

部分排序:对最终输出的没一个文件进行内部排序
全排序:对所有数据进行排序,通常只有一个Reduce
二次排序:排序的条件有两个
辅助排序:可以让不同的key进入到同一个ReduceTask

Combiner合并

Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。

Reduce端分组:Groupingcomparator

Reduce端对key进行分组。应用于:在接收的KeyBean对象时,想让一个或几个字段相同(全部字段比较不相同)的Key进入到同一个Reduce方法时,可以采用分组排序。

⑦逻辑处理接口:Reducer

用户根据业务需求实现其中三个方法: reduce()setup()cleanup()

⑧输出数据接口:OutputFormat

默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对向目标文本文件中输出为一行。

用户还可以自定义OutputFormat

原文地址:https://www.cnblogs.com/hucheng1997/p/13083270.html