DataJoin: Reduceside join

首先三个概念:

  1. A data source is analogous to a table in relational databases.( a single file or multiple files)
  2. tag:Tagging the record will ensure that specifi c metadata will always go along with the record.
  3. group key: like a join key in a relational database

  两个数据源data sources:

      Customers                  Orders
      1,Stephanie Leung,555-555-5555      3,A,12.95,02-Jun-2008
      2,Edward Kim,123-456-7890         1,B,88.25,20-May-2008
      3,Jose Madriz,281-330-8004         2,C,32.00,30-Nov-2007
      4,David Stork,408-555-0000          3,D,25.02,22-Jan-2009

 

map的目的是给每数据源中的每条记录打包封装,封装成上面那样。

MapClass继承DataJoinMapperBase(已经实现map()方法),我们需实现三个方法:

  1. Text generateInputTag(String inputFile);  //此方法在configre()中被调用,返回给DataJoinMapperBase的变量inputTag
  2. Text generateGroupKey(TaggedMapOutput aRecord);
  3. TaggedMapOutput generateTaggedMapOutput(Object value);

  在map()中调用第2,3方法实现对每条记录的封装并输出给reduce(). 如下图:

 

Reduce继承DataJoinReduceBase(已经实现reduce()方法), 我们需实现combine()方法.

reduce()对接收到的记录(相同groupKey的多个TaggedMapOutput数据)进行cross-product,并对每个cross-product结果调用combine().

combine()需自行实现,在combine()中做具体的join工作.

代码如下:

需导入jar文件contrib\datajoin\hadoop-datajoin-*.jar

需使用老的API

public class DataJoin extends Configured implements Tool {

    public static class MapClass extends DataJoinMapperBase {   

        protected Text generateInputTag(String inputFile) {

            String datasource = inputFile.split("-")[0];

            return new Text(datasource);

        }   

     protected Text generateGroupKey(TaggedMapOutput aRecord) {

            String line = ((Text) aRecord.getData()).toString();

            String[] tokens = line.split(",");

            String groupKey = tokens[0];

            return new Text(groupKey);

      }

        protected TaggedMapOutput generateTaggedMapOutput(Object value) {

            TaggedWritable retv = new TaggedWritable((Text) value);

            retv.setTag(this.inputTag);

            return retv;

        }

    }

   

    public static class Reduce extends DataJoinReducerBase {   

        protected TaggedMapOutput combine(Object[] tags, Object[] values) {

            if (tags.length < 2) return null; 

            String joinedStr = "";

            for (int i=0; i<values.length; i++) {

                if (i > 0) joinedStr += ",";

                TaggedWritable tw = (TaggedWritable) values[i];

                String line = ((Text) tw.getData()).toString();

                String[] tokens = line.split(",", 2);

                joinedStr += tokens[1];

            }

            TaggedWritable retv = new TaggedWritable(new Text(joinedStr));

            retv.setTag((Text) tags[0]);

            return retv;

        }

    }

   

    public static class TaggedWritable extends TaggedMapOutput {  

        private Writable data;     

        public TaggedWritable(Writable data) {

            this.tag = new Text("");

            this.data = data;

        }  

        public Writable getData() {

            return data;

        }     

        public void write(DataOutput out) throws IOException {

            this.tag.write(out);

            this.data.write(out);

        }    

        public void readFields(DataInput in) throws IOException {

            this.tag.readFields(in);

            this.data.readFields(in);

        }

    }

   

    public int run(String[] args) throws Exception {

        Configuration conf = getConf();      

        JobConf job = new JobConf(conf, DataJoin.class);  

    job.setJobName("DataJoin");   

        Path in = new Path(args[0]);

        Path out = new Path(args[1]);

        FileInputFormat.setInputPaths(job, in);

        FileOutputFormat.setOutputPath(job, out);      

       

        job.setMapperClass(MapClass.class);

        job.setReducerClass(Reduce.class);

       

        job.setInputFormat(TextInputFormat.class);

        job.setOutputFormat(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(TaggedWritable.class);

        job.set("mapred.textoutputformat.separator", ",");

       

        JobClient.runJob(job);

        return 0;

    }

   

    public static void main(String[] args) throws Exception {

        int res = ToolRunner.run(new Configuration(), new DataJoin(), args);       

        System.exit(res);

    }

}

原文地址:https://www.cnblogs.com/liangzh/p/2508358.html