mapreduce实现"浏览该商品的人大多数还浏览了"经典应用

转自:http://blog.csdn.net/u011750989/article/details/12004065

看着思路不错。是协同过滤的一种实现。

注意:01是分隔符,16进制,ascii码里001,叫SOH(start of heading)。用vi编辑器Ctrl+v然后Ctrl+a。其他的分隔符有tab键(ascii 为9),空格(32)

输入:

日期 ...cookie id. ...商品id..

xx xx xx

输出:

商品id 商品id列表(按优先级排序,用逗号分隔)

xx xx

比如:

id1 id3,id0,id4,id2

id2 id0,id5

整个计算过程分为4步

1、提取原始日志中的(日期,cookie id,商品id)信息,按天处理,最后输出数据格式

商品id-0 商品id-1

xx x x

这一步做了次优化,商品id-0一定比商品id-1小,为了减少存储,在最后汇总数据转置下即可

reduce做局部排序及排重

2、基于上次的结果做汇总,按天计算

商品id-0 商品id-1 关联值(关联值即同时访问这两个商品的用户数)

xx x x xx

3、汇总最近三个月数据,同时考虑时间衰减,时间越久关联值的贡献越低,最后输出两两商品的关联值(包括转置后)

4、行列转换,生成最后要的推荐结果数据,按关联值排序生成

第一个MR

  1. import java.io.IOException;
  2. import java.util.ArrayList;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.io.WritableComparable;
  9. import org.apache.hadoop.io.WritableComparator;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Partitioner;
  13. import org.apache.hadoop.mapreduce.Reducer;
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. import org.apache.hadoop.util.GenericOptionsParser;
  17. import org.apache.log4j.Logger;
  18. /*
  19. * 输入:原始数据,会有重复
  20. *日期 cookie 楼盘id
  21. *
  22. * 输出:
  23. * 日期 楼盘id1 楼盘id2 //楼盘id1一定小于楼盘id2 ,按日期 cookie进行分组
  24. *
  25. */
  26. public class HouseMergeAndSplit {
  27. public staticclass Partitioner1 extends Partitioner<TextPair, Text> {
  28. @Override
  29. public int getPartition(TextPair key, Text value,int numParititon) {
  30. return Math.abs((new Text(key.getFirst().toString()+key.getSecond().toString())).hashCode() *127) % numParititon;
  31. }
  32. }
  33. public staticclass Comp1 extends WritableComparator {
  34. public Comp1() {
  35. super(TextPair.class,true);
  36. }
  37. @SuppressWarnings("unchecked")
  38. public int compare(WritableComparable a, WritableComparable b) {
  39. TextPair t1 = (TextPair) a;
  40. TextPair t2 = (TextPair) b;
  41. int comp= t1.getFirst().compareTo(t2.getFirst());
  42. if (comp!=0)
  43. return comp;
  44. return t1.getSecond().compareTo(t2.getSecond());
  45. }
  46. }
  47. public staticclass TokenizerMapper
  48. extends Mapper<LongWritable, Text, TextPair, Text>{
  49. Text val=new Text("test");
  50. public void map(LongWritable key, Text value, Context context
  51. ) throws IOException, InterruptedException {
  52. String s[]=value.toString().split("01");
  53. TextPair tp=new TextPair(s[0],s[1],s[4]+s[3]);//thedate cookie city+houseid
  54. context.write(tp, val);
  55. }
  56. }
  57. public staticclass IntSumReducer
  58. extends Reducer<TextPair,Text,Text,Text> {
  59. private static String comparedColumn[] =new String[3];
  60. ArrayList<String> houselist= new ArrayList<String>();
  61. private static Text keyv =new Text();
  62. private static Text valuev =new Text();
  63. static Logger logger = Logger.getLogger(HouseMergeAndSplit.class.getName());
  64. public void reduce(TextPair key, Iterable<Text> values,
  65. Context context
  66. ) throws IOException, InterruptedException {
  67. houselist.clear();
  68. String thedate=key.getFirst().toString();
  69. String cookie=key.getSecond().toString();
  70. for (int i=0;i<3;i++)
  71. comparedColumn[i]="";
  72. //first+second为分组键,每次不同重新调用reduce函数
  73. for (Text val:values)
  74. {
  75. if (thedate.equals(comparedColumn[0]) && cookie.equals(comparedColumn[1])&& !key.getThree().toString().equals(comparedColumn[2]))
  76. {
  77. // context.write(new Text(key.getFirst()+" "+key.getSecond().toString()), new Text(key.getThree().toString()+" first"+ " "+comparedColumn[0]+" "+comparedColumn[1]+" "+comparedColumn[2]));
  78. houselist.add(key.getThree().toString());
  79. comparedColumn[0]=key.getFirst().toString();
  80. comparedColumn[1]=key.getSecond().toString();
  81. comparedColumn[2]=key.getThree().toString();
  82. }
  83. if (!thedate.equals(comparedColumn[0])||!cookie.equals(comparedColumn[1]))
  84. {
  85. // context.write(new Text(key.getFirst()+" "+key.getSecond().toString()), new Text(key.getThree().toString()+" second"+ " "+comparedColumn[0]+" "+comparedColumn[1]+" "+comparedColumn[2]));
  86. houselist.add(key.getThree().toString());
  87. comparedColumn[0]=key.getFirst().toString();
  88. comparedColumn[1]=key.getSecond().toString();
  89. comparedColumn[2]=key.getThree().toString();
  90. }
  91. }
  92. keyv.set(comparedColumn[0]); //日期
  93. //valuev.set(houselist.toString());
  94. //logger.info(houselist.toString());
  95. //context.write(keyv,valuev);
  96. for (int i=0;i<houselist.size()-1;i++)
  97. {
  98. for (int j=i+1;j<houselist.size();j++)
  99. { valuev.set(houselist.get(i)+" "+houselist.get(j));//关联的楼盘
  100. context.write(keyv,valuev);
  101. }
  102. }
  103. }
  104. }
  105. public staticvoid main(String[] args) throws Exception {
  106. Configuration conf = new Configuration();
  107. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  108. if (otherArgs.length != 2) {
  109. System.err.println("Usage: wordcount <in> <out>");
  110. System.exit(2);
  111. }
  112. FileSystem fstm = FileSystem.get(conf);
  113. Path outDir = new Path(otherArgs[1]);
  114. fstm.delete(outDir, true);
  115. conf.set("mapred.textoutputformat.separator"," "); //reduce输出时key value中间的分隔符
  116. Job job = new Job(conf, "HouseMergeAndSplit");
  117. job.setNumReduceTasks(4);
  118. job.setJarByClass(HouseMergeAndSplit.class);
  119. job.setMapperClass(TokenizerMapper.class);
  120. job.setMapOutputKeyClass(TextPair.class);
  121. job.setMapOutputValueClass(Text.class);
  122. // 设置partition
  123. job.setPartitionerClass(Partitioner1.class);
  124. // 在分区之后按照指定的条件分组
  125. job.setGroupingComparatorClass(Comp1.class);
  126. // 设置reduce
  127. // 设置reduce的输出
  128. job.setReducerClass(IntSumReducer.class);
  129. job.setOutputKeyClass(Text.class);
  130. job.setOutputValueClass(Text.class);
  131. //job.setNumReduceTasks(18);
  132. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  133. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  134. System.exit(job.waitForCompletion(true) ?0 : 1);
  135. }
  136. }

TextPair

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.io.WritableComparable;
  6. public class TextPairimplements WritableComparable<TextPair> {
  7. private Text first;
  8. private Text second;
  9. private Text three;
  10. public TextPair() {
  11. set(new Text(), new Text(),new Text());
  12. }
  13. public TextPair(String first, String second,String three) {
  14. set(new Text(first), new Text(second),new Text(three));
  15. }
  16. public TextPair(Text first, Text second,Text Three) {
  17. set(first, second,three);
  18. }
  19. public void set(Text first, Text second,Text three) {
  20. this.first = first;
  21. this.second = second;
  22. this.three=three;
  23. }
  24. public Text getFirst() {
  25. return first;
  26. }
  27. public Text getSecond() {
  28. return second;
  29. }
  30. public Text getThree() {
  31. return three;
  32. }
  33. public void write(DataOutput out)throws IOException {
  34. first.write(out);
  35. second.write(out);
  36. three.write(out);
  37. }
  38. public void readFields(DataInput in)throws IOException {
  39. first.readFields(in);
  40. second.readFields(in);
  41. three.readFields(in);
  42. }
  43. public int compareTo(TextPair tp) {
  44. int cmp = first.compareTo(tp.first);
  45. if (cmp != 0) {
  46. return cmp;
  47. }
  48. cmp= second.compareTo(tp.second);
  49. if (cmp != 0) {
  50. return cmp;
  51. }
  52. return three.compareTo(tp.three);
  53. }
  54. }


TextPairSecond

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import org.apache.hadoop.io.FloatWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.io.WritableComparable;
  7. public class TextPairSecondimplements WritableComparable<TextPairSecond> {
  8. private Text first;
  9. private FloatWritable second;
  10. public TextPairSecond() {
  11. set(new Text(), new FloatWritable());
  12. }
  13. public TextPairSecond(String first,float second) {
  14. set(new Text(first), new FloatWritable(second));
  15. }
  16. public TextPairSecond(Text first, FloatWritable second) {
  17. set(first, second);
  18. }
  19. public void set(Text first, FloatWritable second) {
  20. this.first = first;
  21. this.second = second;
  22. }
  23. public Text getFirst() {
  24. return first;
  25. }
  26. public FloatWritable getSecond() {
  27. return second;
  28. }
  29. public void write(DataOutput out)throws IOException {
  30. first.write(out);
  31. second.write(out);
  32. }
  33. public void readFields(DataInput in)throws IOException {
  34. first.readFields(in);
  35. second.readFields(in);
  36. }
  37. public int compareTo(TextPairSecond tp) {
  38. int cmp = first.compareTo(tp.first);
  39. if (cmp != 0) {
  40. return cmp;
  41. }
  42. return second.compareTo(tp.second);
  43. }
  44. }

第二个MR

  1. import java.io.IOException;
  2. import java.text.SimpleDateFormat;
  3. import java.util.ArrayList;
  4. import java.util.Date;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.FileSystem;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.NullWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.io.WritableComparable;
  13. import org.apache.hadoop.io.WritableComparator;
  14. import org.apache.hadoop.mapred.OutputCollector;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.apache.hadoop.mapreduce.Mapper;
  17. import org.apache.hadoop.mapreduce.Partitioner;
  18. import org.apache.hadoop.mapreduce.Reducer;
  19. import org.apache.hadoop.mapreduce.Mapper.Context;
  20. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  21. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  22. import org.apache.hadoop.util.GenericOptionsParser;
  23. import org.apache.log4j.Logger;
  24. /*
  25. * 统计楼盘之间共同出现的次数
  26. * 输入:
  27. * 日期 楼盘1 楼盘2
  28. *
  29. * 输出:
  30. * 日期 楼盘1 楼盘2 共同出现的次数
  31. *
  32. */
  33. public class HouseCount {
  34. public staticclass TokenizerMapper
  35. extends Mapper<LongWritable, Text, Text, IntWritable>{
  36. IntWritable iw=new IntWritable(1);
  37. public void map(LongWritable key, Text value, Context context
  38. ) throws IOException, InterruptedException {
  39. context.write(value, iw);
  40. }
  41. }
  42. public staticclass IntSumReducer
  43. extends Reducer<Text,IntWritable,Text,IntWritable> {
  44. IntWritable result=new IntWritable();
  45. public void reduce(Text key, Iterable<IntWritable> values,
  46. Context context
  47. ) throws IOException, InterruptedException {
  48. int sum=0;
  49. for (IntWritable iw:values)
  50. {
  51. sum+=iw.get();
  52. }
  53. result.set(sum);
  54. context.write(key, result) ;
  55. }
  56. }
  57. public staticvoid main(String[] args) throws Exception {
  58. Configuration conf = new Configuration();
  59. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  60. if (otherArgs.length != 2) {
  61. System.err.println("Usage: wordcount <in> <out>");
  62. System.exit(2);
  63. }
  64. FileSystem fstm = FileSystem.get(conf);
  65. Path outDir = new Path(otherArgs[1]);
  66. fstm.delete(outDir, true);
  67. conf.set("mapred.textoutputformat.separator"," "); //reduce输出时key value中间的分隔符
  68. Job job = new Job(conf, "HouseCount");
  69. job.setNumReduceTasks(2);
  70. job.setJarByClass(HouseCount.class);
  71. job.setMapperClass(TokenizerMapper.class);
  72. job.setMapOutputKeyClass(Text.class);
  73. job.setMapOutputValueClass(IntWritable.class);
  74. // 设置reduce
  75. // 设置reduce的输出
  76. job.setReducerClass(IntSumReducer.class);
  77. job.setOutputKeyClass(Text.class);
  78. job.setOutputValueClass(IntWritable.class);
  79. //job.setNumReduceTasks(18);
  80. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  81. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  82. System.exit(job.waitForCompletion(true) ?0 : 1);
  83. }
  84. }


第三个MR

  1. import java.io.IOException;
  2. import java.text.ParseException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Calendar;
  6. import java.util.Date;
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.fs.FileSystem;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.FloatWritable;
  11. import org.apache.hadoop.io.IntWritable;
  12. import org.apache.hadoop.io.LongWritable;
  13. import org.apache.hadoop.io.NullWritable;
  14. import org.apache.hadoop.io.Text;
  15. import org.apache.hadoop.io.WritableComparable;
  16. import org.apache.hadoop.io.WritableComparator;
  17. import org.apache.hadoop.mapred.OutputCollector;
  18. import org.apache.hadoop.mapreduce.Job;
  19. import org.apache.hadoop.mapreduce.Mapper;
  20. import org.apache.hadoop.mapreduce.Partitioner;
  21. import org.apache.hadoop.mapreduce.Reducer;
  22. import org.apache.hadoop.mapreduce.Mapper.Context;
  23. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  24. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  25. import org.apache.hadoop.util.GenericOptionsParser;
  26. import org.apache.log4j.Logger;
  27. /*
  28. * 汇总近三个月统计楼盘之间共同出现的次数,考虑衰减系数, 并最后a b 转成 b a输出一次
  29. * 输入:
  30. * 日期 楼盘1 楼盘2 共同出现的次数
  31. *
  32. * 输出
  33. * 楼盘1 楼盘2 共同出现的次数(考虑了衰减系数,每天的衰减系数不一样)
  34. *
  35. */
  36. public class HouseCountHz {
  37. public staticclass HouseCountHzMapper
  38. extends Mapper<LongWritable, Text, Text, FloatWritable>{
  39. Text keyv=new Text();
  40. FloatWritable valuev=new FloatWritable();
  41. public void map(LongWritable key, Text value, Context context
  42. ) throws IOException, InterruptedException {
  43. String[] s=value.toString().split(" ");
  44. keyv.set(s[1]+" "+s[2]);//楼盘1,楼盘2
  45. Calendar date1=Calendar.getInstance();
  46. Calendar d2=Calendar.getInstance();
  47. Date b = null;
  48. SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
  49. try {
  50. b=sdf.parse(s[0]);
  51. } catch (ParseException e) {
  52. e.printStackTrace();
  53. }
  54. d2.setTime(b);
  55. long n=date1.getTimeInMillis();
  56. long birth=d2.getTimeInMillis();
  57. long sss=n-birth;
  58. int day=(int)((sss)/(3600*24*1000));//该条记录的日期与当前日期的日期差
  59. float factor=1/(1+(float)(day-1)/10);//衰减系数
  60. valuev.set(Float.parseFloat(s[3])*factor);
  61. context.write(keyv, valuev);
  62. }
  63. }
  64. public staticclass HouseCountHzReducer
  65. extends Reducer<Text,FloatWritable,Text,FloatWritable> {
  66. FloatWritable result=new FloatWritable();
  67. Text keyreverse=new Text();
  68. public void reduce(Text key, Iterable<FloatWritable> values,
  69. Context context
  70. ) throws IOException, InterruptedException {
  71. float sum=0;
  72. for (FloatWritable iw:values)
  73. {
  74. sum+=iw.get();
  75. }
  76. result.set(sum);
  77. String[] keys=key.toString().split(" ");
  78. keyreverse.set(keys[1]+" "+keys[0]);
  79. context.write(key, result) ;
  80. context.write(keyreverse, result) ;
  81. }
  82. }
  83. public staticvoid main(String[] args) throws Exception {
  84. Configuration conf = new Configuration();
  85. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  86. if (otherArgs.length !=2) {
  87. System.err.println("Usage: wordcount <in> <out>");
  88. System.exit(2);
  89. }
  90. FileSystem fstm = FileSystem.get(conf);
  91. Path outDir = new Path(otherArgs[1]);
  92. fstm.delete(outDir, true);
  93. conf.set("mapred.textoutputformat.separator"," "); //reduce输出时key value中间的分隔符
  94. Job job = new Job(conf,"HouseCountHz");
  95. job.setNumReduceTasks(2);
  96. job.setJarByClass(HouseCountHz.class);
  97. job.setMapperClass(HouseCountHzMapper.class);
  98. job.setMapOutputKeyClass(Text.class);
  99. job.setMapOutputValueClass(FloatWritable.class);
  100. // 设置reduce
  101. // 设置reduce的输出
  102. job.setReducerClass(HouseCountHzReducer.class);
  103. job.setOutputKeyClass(Text.class);
  104. job.setOutputValueClass(FloatWritable.class);
  105. //job.setNumReduceTasks(18);
  106. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  107. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  108. System.exit(job.waitForCompletion(true) ?0 : 1);
  109. }
  110. }


第四个MR

  1. import java.io.IOException;
  2. import java.util.Iterator;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.FloatWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.io.WritableComparable;
  10. import org.apache.hadoop.io.WritableComparator;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.Mapper;
  13. import org.apache.hadoop.mapreduce.Partitioner;
  14. import org.apache.hadoop.mapreduce.Reducer;
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  17. import org.apache.hadoop.util.GenericOptionsParser;
  18. /*
  19. * 输入数据:
  20. * 楼盘1 楼盘2 共同出现的次数
  21. *
  22. * 输出数据
  23. * 楼盘1 楼盘2,楼盘3,楼盘4 (按次数排序)
  24. */
  25. public class HouseRowToCol {
  26. public staticclass Partitioner1 extends Partitioner<TextPairSecond, Text> {
  27. @Override
  28. //分区
  29. public int getPartition(TextPairSecond key, Text value,int numParititon) {
  30. return Math.abs((new Text(key.getFirst().toString()+key.getSecond().toString())).hashCode() *127) % numParititon;
  31. }
  32. }
  33. //分组
  34. public staticclass Comp1 extends WritableComparator {
  35. public Comp1() {
  36. super(TextPairSecond.class,true);
  37. }
  38. @SuppressWarnings("unchecked")
  39. public int compare(WritableComparable a, WritableComparable b) {
  40. TextPairSecond t1 = (TextPairSecond) a;
  41. TextPairSecond t2 = (TextPairSecond) b;
  42. return t1.getFirst().compareTo(t2.getFirst());
  43. }
  44. }
  45. //排序
  46. public staticclass KeyComp extends WritableComparator {
  47. public KeyComp() {
  48. super(TextPairSecond.class,true);
  49. }
  50. @SuppressWarnings("unchecked")
  51. public int compare(WritableComparable a, WritableComparable b) {
  52. TextPairSecond t1 = (TextPairSecond) a;
  53. TextPairSecond t2 = (TextPairSecond) b;
  54. int comp= t1.getFirst().compareTo(t2.getFirst());
  55. if (comp!=0)
  56. return comp;
  57. return -t1.getSecond().compareTo(t2.getSecond());
  58. }
  59. }
  60. public staticclass HouseRowToColMapper
  61. extends Mapper<LongWritable, Text, TextPairSecond, Text>{
  62. Text houseid1=new Text();
  63. Text houseid2=new Text();
  64. FloatWritable weight=new FloatWritable();
  65. public void map(LongWritable key, Text value, Context context
  66. ) throws IOException, InterruptedException {
  67. String s[]=value.toString().split(" ");
  68. weight.set(Float.parseFloat(s[2]));
  69. houseid1.set(s[0]);
  70. houseid2.set(s[1]);
  71. TextPairSecond tp=new TextPairSecond(houseid1,weight);
  72. context.write(tp, houseid2);
  73. }
  74. }
  75. public staticclass HouseRowToColReducer
  76. extends Reducer<TextPairSecond,Text,Text,Text> {
  77. Text valuev=new Text();
  78. public void reduce(TextPairSecond key, Iterable<Text> values,
  79. Context context
  80. ) throws IOException, InterruptedException {
  81. Text keyv=key.getFirst();
  82. Iterator<Text> it=values.iterator();
  83. StringBuilder sb=new StringBuilder(it.next().toString());
  84. while(it.hasNext())
  85. {
  86. sb.append(","+it.next().toString());
  87. }
  88. valuev.set(sb.toString());
  89. context.write(keyv, valuev);
  90. }
  91. }
  92. public staticvoid main(String[] args) throws Exception {
  93. Configuration conf = new Configuration();
  94. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  95. if (otherArgs.length != 2) {
  96. System.err.println("Usage: wordcount <in> <out>");
  97. System.exit(2);
  98. }
  99. FileSystem fstm = FileSystem.get(conf);
  100. Path outDir = new Path(otherArgs[1]);
  101. fstm.delete(outDir, true);
  102. conf.set("mapred.textoutputformat.separator"," "); //reduce输出时key value中间的分隔符
  103. Job job = new Job(conf, "HouseRowToCol");
  104. job.setNumReduceTasks(4);
  105. job.setJarByClass(HouseRowToCol.class);
  106. job.setMapperClass(HouseRowToColMapper.class);
  107. job.setMapOutputKeyClass(TextPairSecond.class);
  108. job.setMapOutputValueClass(Text.class);
  109. // 设置partition
  110. job.setPartitionerClass(Partitioner1.class);
  111. // 在分区之后按照指定的条件分组
  112. job.setGroupingComparatorClass(Comp1.class);
  113. job.setSortComparatorClass(KeyComp.class);
  114. // 设置reduce
  115. // 设置reduce的输出
  116. job.setReducerClass(HouseRowToColReducer.class);
  117. job.setOutputKeyClass(Text.class);
  118. job.setOutputValueClass(Text.class);
  119. //job.setNumReduceTasks(18);
  120. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  121. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  122. System.exit(job.waitForCompletion(true) ?0 : 1);
  123. }
  124. }
原文地址:https://www.cnblogs.com/cl1024cl/p/6205450.html