hadoop集群运行dedup实现去重功能

一、配置开发环境
1.我们用到的IDE是eclipse。要用它进行hadoop编程,要给eclipse安装hadoop自带的插件。(有的版本以源码提供插件,需要用户根据需要自己编译)
2.用到的eclipse版本是:eclipse-jee-indigo-SR2-linux-gtk.tar.gz
hadoop的版本是:hadoop-1.0.4.tar.gz
hadoop1.0.4提供的插件就是源码形式。我从网上下载了一个已经编译好的插件,其具体名称为:hadoop-eclipse-plugin-1.0.4.jar
3.把hadoop自带的插件或者自己编译的插件放到eclipse/plugins/目录下面。然后重启eclipse,有时插件加载可能失败,这时可以用命令行eclipse -clean启动。
如果插件安装成功的话,点击file->new->project。此时就会出现Map/Reduce project这个项目。
4.然后在eclipse里面点击window->preferences->Hadoop MapReduce。右边出现hadoop installation directory:在后面的输入框中输入hadoop的安装目录。依次点击apply->ok就可以啦。
5.在eclipse中,点击window->show view->other->MapReduce Tools->Map/Reduce Locations,再点ok。接下来,在eclipse的正下方会出现Map/Reduce Locations。在空白出右击,然后选new hadoop location。接着出现define hadoop location对话框。然后在location name处随意给个名字。在Map/Reduce Master处根据HADOOP_HOME/conf/mapred-site.xml中的信息进行填写。在DFS Master处根据HADOOP_HOME/conf/core-site.xml。如果core-site.xml下只有localhost而没有端口号,默认的就是8020,设置好后finish就可以啦。
上面这些设置完成以后,就可以在project explorer的DFS Location里面看见分布式文件系统的目录结构了。

二、建立hadoop项目-----Dedup.java
1.在eclipse中点击file->new ->Map/Reduce project。填上project name以后,然后直接finish即可。
2.在src/目录下建立一个类。Dedup.java相应的包名称为:com.hadoop.test。
3.按照程序中给定的输入目录(有时在运行时通过参数进行指定),在HDFS中创建相应的输入目录,然后把相关的输入文件传送到那个输入目录当中去。当程序运行之前输出目录不能存在,否则会出现错误信息。在HDFS中默认的当前工作目录是/user/root/。如果指定目录时直接使用目录名,那么input_folder and output_folder都应当在/user/root/下面出现。
4.在Dedup.java界面中右击选择Run As ->Run on hadoop。如果已经有hadoop location的话,选择choose an existing server from the list belowl.否则define a new hadoop server location.弹出define hadoop location对话框,具体操作上面已经提到。最后点击finish就可以完成相关操作。从而运行hadoop程序。

三、注意事项
1.如果new一个java project。编写相应hadoop程序,并把相应的jar包加到classpath当中去。然后点击运行run as->java application的话,项目中涉及到的输入和输出目录将都是本地目录。而且默认的当前工作目录是:System.out.println(System.getProperty("user.dir"))输出的目录。此时相应的输入和输出目录都在这个当前工作目录下。

四、本文提到的Dedup.java源码如下:
package com.hadoop.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Dedup {

    // map将输入中的value复制到输出数据的key上,并直接输出

    public static class Map extends Mapper {

        private static Text line = new Text();// 每行数据

        // 实现map函数

        public void map(Object key, Text value, Context context)

        throws IOException, InterruptedException {

            line = value;

            context.write(line, new Text(""));

        }

    }

    // reduce将输入中的key复制到输出数据的key上,并直接输出

    public static class Reduce extends Reducer {

        // 实现reduce函数

        public void reduce(Text key, Iterable values, Context context)

        throws IOException, InterruptedException {

            context.write(key, new Text(""));

        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] ioArgs = new String[] { "dedup_in", "dedup_out" };

        // String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
        // .getRemainingArgs();
        //
        // if (otherArgs.length != 2) {
        //
        // System.err.println("Usage: Data Deduplication ");
        //
        // System.exit(2);
        //
        // }

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "Data Deduplication");

        job.setJarByClass(Dedup.class);

        // 设置Map、Combine和Reduce处理类

        job.setMapperClass(Map.class);

        job.setCombinerClass(Reduce.class);

        job.setReducerClass(Reduce.class);

        // 设置输出类型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        // 设置输入和输出目录

        // FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        //
        // FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        FileInputFormat.addInputPath(job, new Path(ioArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(ioArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

同时也可以看看这篇文章:http://blog.sina.com.cn/s/blog_7deb436e0101kh0d.html
 
 
 
 
原文地址:https://www.cnblogs.com/qiaoyanlin/p/6820235.html