Combiner合并

一、概念

  1、Combiner是MapReduce程序中Mapper和Reduce之外的一种组件

  2、Combiner组件的父类就是Reducer

  3、Combiner和Reduce的区别主要在于运行位置

      Combiner是在每一个MapTask所在的节点运行

      Reduce是在接收全局所有Mapper的输出结果后执行

  4、Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量

  5、Combiner要在不影响最终逻辑业务的情况下使用,而且,Combiner的输出kv要和Reduce的输入kv类型对应起来。

二、项目介绍

   1、待处理文本 

    
zhangsan jinghang
zhangsan jinghang
lisi wangwu
zhaoliu    zhangsan
xiaoming    zhaoliu
data.txt

  2、需求

    统计每个单词出现的次数

  3、WcMapper.java  

      
package com.jh.wccount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
 * KEYIN, :hadoop处理文本后输入到mapper中key值
 * VALUEIN, hadoop处理文本后输入到mapper中value值
 * KEYOUT, mapper处理文本后输入到context(reducer)中key值
 * VALUEOUT:mapper处理文本后输入到context(reducer)中value值
 * */

/**
 *  LongWritable :偏移量,Text:文本内容,Text:输出到reduce的键(单词字符串),
 *  IntWritable:输出到reduce端的值(每个单词出现的次数)
 */
public class WcMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    private Text keyout = new Text(); // 创建text对象,存储单词
    private IntWritable valueout = new IntWritable(1); // 出现次数
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //获取文本中的每一行内容
        String line = value.toString();
        // 分割
        String[] words = line.split(" ");

        //遍历统计单词个数
        for (String word : words) {
            //将字符串赋值给keyout
            keyout.set(word);
            //输出key,value
            context.write(keyout,valueout);
        }
    }

}
WcMapper

  4、WcCombiner.java   

      
package com.jh.wccount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WcCombiner extends Reducer<Text,IntWritable,Text,IntWritable>  {
    private IntWritable total = new IntWritable();
    /*
        在wordcount的案例中,combiner的作用:
        对maptask输出的结构进行局部的汇总
        combiner可以提高hadoop运行效率
        但是只有不影响输出结果才能使用
     */
    // 自定义类继承Reducer,泛型分别为Mapper端的输出和Reduce端的输入
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        //赋值
        total.set(sum);
        context.write(key,total);
    }
}
WcCombiner

  5、WcReduce.java    

      
package com.jh.wccount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WcReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable total = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //业务逻辑:统计单词出现的次数,汇总
        int sum = 0;
        //数据进入reducer会根据key组分组,我们在reduce获取的是一组数据
        //通过循环累加,得到某个单词出现的次数
        for (IntWritable value : values) {
            sum = sum + value.get();
        }
        //赋值
        total.set(sum);
        //reducer输出key,value
        context.write(key,total);
    }
}
WcReduce

  6、WcDriver.java   

          
package com.jh.wccount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
public class WcDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取job对象和配置文件对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //添加jar的路径
        job.setJarByClass(WcDriver.class);

        //设置mapper类和reducer类
        job.setMapperClass(WcMapper.class);
        job.setReducerClass(WcReduce.class);

        //设置mapper类和reducer类输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 指定需要使用combiner,以及用哪个类作为combiner的逻辑
        job.setCombinerClass(WcCombiner.class);


        //设置文件的输入出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //如果要处理好多小文件,就用CombineTextInputFormat,自定义切片最大值。
        // 如果不设置InputFormat,它默认用的是TextInputFormat.class
        //job.setInputFormatClass(CombineTextInputFormat.class);
        //虚拟存储切片最大值设置4M
       // CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

        //提交任务
        boolean result = job.waitForCompletion(true);
        //成功返回0,失败返回1
        System.exit(result ? 0:1);




    }
}
WcDriver

  7、输出结果为

    

原文地址:https://www.cnblogs.com/si-137/p/13415411.html