自定义分区随机分配解决数据倾斜的问题

1、第一阶段有三个文本待统计(设置分区的个数为3)

package com.cr.skew;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SkewMapper extends Mapper<LongWritable,Text, Text,IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("进入mapper");
        String line = value.toString();
        String[] arr = line.split(" ");
        Text keyOut = new Text();
        IntWritable valueOut = new IntWritable();

        for (String s : arr){
            keyOut.set(s);
            valueOut.set(1);
            context.write(keyOut,valueOut);
        }



    }
}

package com.cr.skew;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SkewReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
       for(IntWritable iw : values){
           count += iw.get();
       }
       context.write(key,new IntWritable(count));
    }
}

package com.cr.skew;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SkewApp {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //单例作业
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");
        Job job = Job.getInstance(conf);
        System.setProperty("hadoop.home.dir","E:\hadoop-2.7.5");

        //设置job的各种属性
        job.setJobName("SkewApp");                 //设置job名称
        job.setJarByClass(SkewApp.class);              //设置搜索类
        job.setInputFormatClass(TextInputFormat.class);

        //设置输入路径
        FileInputFormat.addInputPath(job,new Path(("D:\skew")));
        //设置输出路径
        Path path = new Path("D:\skew\out");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job,path);

        job.setMapperClass(SkewMapper.class);               //设置mapper类
        job.setReducerClass(SkewReducer.class);               //设置reduecer类

        job.setMapOutputKeyClass(Text.class);            //设置之map输出key
        job.setMapOutputValueClass(IntWritable.class);   //设置map输出value

        job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
        job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value


        job.setNumReduceTasks(3);
        job.waitForCompletion(true);

    }

}
输出
part-r-00000

world3	3

part-r-00001

world1	3
world4	3
part-r-00002
hello	15
world2	3
world5	3

2、第二阶段设置随机分区函数

package com.cr.skew1;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.Random;

//自定义分区数
public class RandomPartition extends Partitioner<Text,IntWritable>{
    @Override
    public int getPartition(Text text, IntWritable intWritable, int numPartitioners) {
        //生成0-numPartitioners的随机数
        return new Random().nextInt(numPartitioners);
    }
}

输出三个分区
hello	7
world1	2
world2	1
world3	1
world5	1
hello	4
world2	2
world3	2
hello	4
world1	1
world4	3
world5	2

3、对上面的reduce聚合进行再次mapper_reducer聚合

package com.cr.skew1_stage2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SkewMapper2 extends Mapper<LongWritable,Text, Text,IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("进入mapper");
        String line = value.toString();
        String[] arr = line.split("	");

        context.write(new Text(arr[0]),new IntWritable(Integer.parseInt(arr[1])));

    }
}
package com.cr.skew1_stage2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SkewReducer1 extends Reducer<Text,IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
       for(IntWritable iw : values){
           count += iw.get();
       }
       context.write(key,new IntWritable(count));
    }
}
package com.cr.skew1_stage2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SkewApp2 {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //单例作业
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");
        Job job = Job.getInstance(conf);
        System.setProperty("hadoop.home.dir","E:\hadoop-2.7.5");

        //设置job的各种属性
        job.setJobName("SkewApp2");                 //设置job名称
        job.setJarByClass(SkewApp2.class);              //设置搜索类
        job.setInputFormatClass(TextInputFormat.class);

        //设置输入路径
        FileInputFormat.addInputPath(job,new Path(("D:\skew\out\part-r-00000")));
        FileInputFormat.addInputPath(job,new Path(("D:\skew\out\part-r-00001")));
        FileInputFormat.addInputPath(job,new Path(("D:\skew\out\part-r-00002")));
        //设置输出路径
        Path path = new Path("D:\skew\out2");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job,path);

        job.setMapperClass(SkewMapper2.class);               //设置mapper类
        job.setReducerClass(SkewReducer1.class);               //设置reduecer类

        job.setMapOutputKeyClass(Text.class);            //设置之map输出key
        job.setMapOutputValueClass(IntWritable.class);   //设置map输出value

        job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
        job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value

        job.setNumReduceTasks(3);
        job.waitForCompletion(true);

    }

}
world3	3
world1	3
world4	3
hello	15
world2	3
world5	3
可以看到这里的结果和上面没有使用分区函数的结果是一样的

4、如果在stage2阶段将job输入格式转为KeyValueTextInputForma

就可以直接将第一阶段的输出作为key-value,而不用进行切割了
package com.cr.skew1_stage_version2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SkewApp2 {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //单例作业
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");
        Job job = Job.getInstance(conf);
        System.setProperty("hadoop.home.dir","E:\hadoop-2.7.5");

        //设置job的各种属性
        job.setJobName("SkewApp2");                 //设置job名称
        job.setJarByClass(SkewApp2.class);              //设置搜索类
        job.setInputFormatClass(KeyValueTextInputFormat.class);

        //设置输入路径
        FileInputFormat.addInputPath(job,new Path(("D:\skew\out\part-r-00000")));
        FileInputFormat.addInputPath(job,new Path(("D:\skew\out\part-r-00001")));
        FileInputFormat.addInputPath(job,new Path(("D:\skew\out\part-r-00002")));
        //设置输出路径
        Path path = new Path("D:\skew\out2");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job,path);

        job.setMapperClass(SkewMapper2.class);               //设置mapper类
        job.setReducerClass(SkewReducer1.class);               //设置reduecer类

        job.setMapOutputKeyClass(Text.class);            //设置之map输出key
        job.setMapOutputValueClass(IntWritable.class);   //设置map输出value

        job.setOutputKeyClass(Text.class);               //设置mapreduce 输出key
        job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value

        job.setNumReduceTasks(3);
        job.waitForCompletion(true);

    }

}
查看源码可知
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
    public KeyValueTextInputFormat() {
    }
这里的mapper输入为<text,text>类型
package com.cr.skew1_stage_version2;

import org.apache.commons.httpclient.methods.multipart.Part;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SkewMapper2 extends Mapper<Text,Text, Text,IntWritable> {

    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("进入mapper");

        context.write(key,new IntWritable(Integer.parseInt(value.toString())));

    }
}
这里的reducer不变
发现结果和上面也是一摸一样的,所以换成job的输入格式为KeyValueTextInputFormat,可以省很多事
欢迎关注我的公众号:小秋的博客 CSDN博客:https://blog.csdn.net/xiaoqiu_cr github:https://github.com/crr121 联系邮箱:rongchen633@gmail.com 有什么问题可以给我留言噢~
原文地址:https://www.cnblogs.com/flyingcr/p/10326938.html