Hadoop采样器实现全排序(报错java.io.EOFException)

利用采样器,mapreducer自动将数据按照从大到小的顺序,根据数据分布的概率,自动分区到不同的区域,之前我们是手动设置分区的范围,将数据分区到不同的分区

点击打开链接

下面我们采用Hadoop内置类-全排序分区类进行自动分区

1、mapper类

package com.cr.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MaxTempMapper extends Mapper<LongWritable, IntWritable, LongWritable, IntWritable> {

    @Override
    protected void map(LongWritable key, IntWritable value, Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }
}

2、reducer类

package com.cr.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MaxTempReducer extends Reducer<LongWritable,IntWritable,LongWritable,IntWritable> {
    @Override
    protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int max = Integer.MIN_VALUE;
        for(IntWritable iw :values){
            max = max > iw.get()? max : iw.get();
        }
        context.write(key,new IntWritable(max));

    }
}

3、全排序采样器主类

这里有两个需要注意的地方就是先创建随机采样对象,然后再写入分区文件,然后设置全排序分区类
另外一个地方需要注意的是job.getConfiguration()注意这里的conf 不是之前的new conf() 是通过job.getConfiguration()
package com.cr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

import java.io.IOException;

public class MaxTempApp {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //单例作业
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");
        Job job = Job.getInstance(conf);

        //设置job的各种属性
        job.setJobName("MaxTempApp");                 //设置job名称
        job.setJarByClass(MaxTempApp.class);              //设置搜索类
        job.setInputFormatClass(SequenceFileInputFormat.class);

        //设置输入路径
        FileInputFormat.addInputPath(job,new Path((args[0])));
        //设置输出路径
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.setMapperClass(MaxTempMapper.class);               //设置mapper类
        job.setReducerClass(MaxTempReducer.class);               //设置reduecer类

        job.setMapOutputKeyClass(LongWritable.class);            //设置之map输出key
        job.setMapOutputValueClass(IntWritable.class);   //设置map输出value
        job.setOutputKeyClass(LongWritable.class);               //设置mapreduce 输出key
        job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value
        //创建随机采样对象
        /**
         * RandomSampler
         * 1:每个key被选中的概率
         * 6000:抽取样本的总数
         * 3:最大采样切片数 分区数
         */
        InputSampler.Sampler<LongWritable,IntWritable> sampler =
                new InputSampler.RandomSampler<LongWritable, IntWritable>(1,6000,3);

        job.setNumReduceTasks(3);                         //设置reduce个数

        //将sample数据写入分区文件
        /**
         * job.getConfiguration()注意这里的conf 不是之前的new conf() 是通过job.getConfiguration()
         */
        TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("D:/sample/par.list"));
        //设置全排序分区类
        job.setPartitionerClass(TotalOrderPartitioner.class);

        InputSampler.writePartitionFile(job,sampler);
        job.waitForCompletion(true);

    }
}

4、创建序列文件作为数据输入

package com.cr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.junit.Test;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;

public class SequenceFile {

    /**
     * 写入文件
     *
     * @throws IOException
     */
    @Test
    public void save() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("D:\sequence\1.seq");
        org.apache.hadoop.io.SequenceFile.Writer writer = org.apache.hadoop.io.SequenceFile.createWriter(fs, conf, path, IntWritable.class, IntWritable.class);
        for (int i = 0; i < 6000; i++) {

            int year = 2000 + new Random().nextInt(50);
            int temp = 10 + new Random().nextInt(30);
            writer.append(new IntWritable(year),new IntWritable(temp));
        }
    }
}
报错java.io.EOFException,还未解决
Exception in thread "main" java.io.EOFException
	at java.io.DataInputStream.readFully(DataInputStream.java:197)
	at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:70)
	at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:120)
	at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2436)
	at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2568)
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.nextKeyValue(SequenceFileRecordReader.java:72)
	at org.apache.hadoop.mapreduce.lib.partition.InputSampler$RandomSampler.getSample(InputSampler.java:222)
	at org.apache.hadoop.mapreduce.lib.partition.InputSampler.writePartitionFile(InputSampler.java:320)
	at com.cr.wordcount.MaxTempApp.main(MaxTempApp.java:57)

有想法的盆友能否提点意见呢

好了,上述问题已经解决,吐舌头问题出在了以下几个地方

1、序列文件的问题,最后结尾的时候没有关闭write(),导致生成的序列文件有问题
2、创建随机采样对象的时候应该将longwritable改成intwritable,因为我读取的是序列文件,类型应该都是int类型
3、这里的mappper和mapreducer的输出类型也应该都是intwritable,而不是longwritable
4、相应的mapper和reducer里面的输入和输出也都应该是intwritable

真的搞得头大,以后一定要注意这些细节大哭

5、运行结果

共产生3个分区文件,每个分区的范围自动生成,按照年份从小到大顺序生成
part-r-00000
1950	9972
1951	9871
1952	9923
1953	9978
1954	9975
1955	9955
1956	9929
1957	9886
1958	9940
1959	9905
1960	9876
1961	9968
1962	9920
1963	9894
1964	9778
1965	9913
1966	9977
1967	9900
1968	9959
1969	9940
1970	9967
1971	9943
1972	9816
1973	9922
1974	9682
1975	9944
1976	9967
1977	9878
1978	9827
1979	9724
1980	9876
1981	9906
1982	9974
1983	9968
part-r-00001
1984	9946
1985	9864
1986	9957
1987	9960
1988	9908
1989	9977
1990	9952
1991	9901
1992	9975
1993	9905
1994	9951
1995	9958
1996	9879
1997	9876
1998	9870
1999	9848
2000	9933
2001	9977
2002	9978
2003	9971
2004	9912
2005	9969
2006	9946
2007	9894
2008	9952
2009	9963
2010	9846
2011	9948
2012	9932
2013	9966
2014	9951
2015	9941
2016	9962
part-r-00002

2017	9943
2018	9929
2019	9804
2020	9971
2021	9942
2022	9898
2023	9901
2024	9891
2025	9974
2026	9945
2027	9876
2028	9956
2029	9921
2030	9937
2031	9943
2032	9806
2033	9956
2034	9966
2035	9859
2036	9928
2037	9883
2038	9978
2039	9943
2040	9884
2041	9970
2042	9894
2043	9955
2044	9937
2045	9886
2046	9939
2047	9913
2048	9914
2049	9536
欢迎关注我的公众号:小秋的博客 CSDN博客:https://blog.csdn.net/xiaoqiu_cr github:https://github.com/crr121 联系邮箱:rongchen633@gmail.com 有什么问题可以给我留言噢~
原文地址:https://www.cnblogs.com/flyingcr/p/10326958.html