hadoop —— MapReduce例子 (数据排序)

参考:http://eric-gcm.iteye.com/blog/1807468

file1.txt:

2
32
654
32
15
756
65223

file2.txt:

5956
22
650
92

file3.txt:

26
54
6

JAVA代码:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Sort {

    // map将输入中的value化成IntWritable类型,作为输出的key
    public static class Map extends
            Mapper<Object, Text, IntWritable, IntWritable> {

        private static IntWritable data = new IntWritable();

        // 实现map函数
        public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

            String line = value.toString();
            data.set(Integer.parseInt(line));
            context.write(data, new IntWritable(1));
        }
    }

    // reduce将输入中的key复制到输出数据的key上,
    // 然后根据输入的value-list中元素的个数决定key的输出次数
    // 用全局linenum来代表key的位次
    public static class Reduce extends
        Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

        private static IntWritable linenum = new IntWritable(1);

        // 实现reduce函数
        public void reduce(IntWritable key, Iterable<IntWritable> values,
                Context context)
            throws IOException, InterruptedException {

            for (IntWritable val : values) {

                context.write(linenum, key);
                linenum = new IntWritable(linenum.get() + 1);
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // 这句话很关键
        conf.set("mapred.job.tracker", "172.16.11.74:9001");

        String[] ioArgs = new String[] { "sort_in", "sort_out" };
        String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
                .getRemainingArgs();

        if (otherArgs.length != 2) {

            System.err.println("Usage: Data Sort <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf, "Data Sort");
        job.setJarByClass(Sort.class);

        // 设置Map和Reduce处理类
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        // 设置输出类型
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
Sort

运行结果:

1       2
2       6
3       15
4       22
5       26
6       32
7       32
8       54
9       92
10      650
11      654
12      756
13      5956
14      65223

具体打包运行步骤:

参考上一篇博文:http://www.cnblogs.com/-wangjiannan/p/3590324.html

知识点:

  MapReduce的默认排序规则是按照key值进行排序的。

  如果key为封装int的IntWritable类型,那么MapReduce按照数字大小对key排序,

  如果key为封装为String的Text类型,那么MapReduce按照字典顺序对字符串排序。

代码理解:

    map阶段:

    1.   String line = value.toString();    

      实现的map方法中,针对文本的一行(line)处理,遍历每行的代码框架内部实现了

    2.   context.write(data, new IntWritable(1));

             每一行:key是data(强转成IntWritable类型的 line),value是IntWritable类型的 1

        3.  所有行默认排序好了,而且是按递增顺序的

      若有重复的行,那么data对应的value合并成一个集合{Values}({IntWritable类型的 1+})

    reduce阶段:

           1.   reduce(IntWritable key, Iterable<IntWritable> values, Context context)

             每一行:key是map阶段后的data,values是data对应的集合{Values}

    2.   for (IntWritable val : values) { context.write(linenum, key); linenum = new IntWritable(linenum.get() + 1); }

      这行代码的作用是输出: 行号   data

      同时:行号递增,若有重复的行,则换行输出

原文地址:https://www.cnblogs.com/-wangjiannan/p/3590407.html