hadoop程序MapReduce之DataDeduplication

需求:去掉文件中重复的数据。

样板:data.log 

        2016-3-1 a

        2016-3-2 b

        2016-3-2 c

        2016-3-2 b

输出结果: 2016-3-1 a

               2016-3-2 b

               2016-3-2 c

解决思路:取出一行数据,经过mapper处理后,利用MapReduce默认的将相同的key合并后交给reduce处理的原则,这样可以达到数据去重解决问题。

MapReduce分析设计:

Mapper分析设计:

1、<k1,v1>,k1代表:每行数据的行号,v1代表:一行数据。

2、<k2,v2>,k2代表:一行数据,v2代表:就这里可以设置为空值。

Reduce分析设计:

3、<k3,v3>,k3代表:相同的一行数据,v3代表:空值。

4、统计分析输出<k4,v4>,k4代表:相同的一行数据,v4代表:空值。

程序部分:

DataMapper类

package com.cn.DataDeduplication;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DataMapper extends Mapper<Object, Text, Text, Text>{
    Text line = new Text();
    @Override
    protected void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        line = value; 
        context.write(line, new Text(""));
    }
}

DataReduce类

package com.cn.DataDeduplication;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DataReduce extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        context.write(key, new Text(""));
    }
}

DataDeduplication类:

package com.cn.DataDeduplication;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * 数据去重
 * @author root
 *
 */
public class DataDeduplication {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
           System.err.println("Usage: wordcount  ");
           System.exit(2);
        }
        //创建一个job
        Job job = new Job(conf, "data deduplication");
        
        //设置运行的jar
        job.setJarByClass(DataDeduplication.class);
        
        //设置输入和输出文件路径
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        
        //设置mapper和reduce处理类
        job.setMapperClass(DataMapper.class);
        job.setReducerClass(DataReduce.class);
        
        //设置输出key-value数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        //提交作业并等待它完成
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    
}

补充一点:一个文件切分的时候按照默认64M的数据块原则,启动一个mapper进程。

举例说明:比如data.log有20M,会启动一个mapper进程,data1.log有80M,会将这个文件拆分成64M+16M,所有要启动2个Mapper进程,

              最终这两个文件会启动3个mapper进程。

   

原文地址:https://www.cnblogs.com/xubiao/p/5745300.html