02Hadoop二次排序2

案例:

数据:

邮编   |     日期     |金额

ILMN,2013-12-05,97.65
GOOD,2013-12-09,1078.14
IBM,2013-12-09,177.46
ILMN,2013-12-09,101.33
ILMN,2013-12-06,99.25,
GOOD,2013-12-06,1069.87
IBM,2013-12-06,177.67
GOOD,2013-12-05,1057.34
GOOD,2013-12-05,10.23
GOOD,2013-12-05,11.43
GOOD,2013-12-05,17.34

要求:把同一个邮编的放在一起,然后根据日期和金额降序排列。

效果如下:

思路:在map阶段,构造的key(CompositeKey)是:(邮编,日期);value(NaturalValue)是(日期,价格)。然后key继承

WritableComparable,实现比较函数这样就可以保证一份数据出来是分区且区内有序的。

然后在shuffle过程中,指定一个key比较器(CompositeKeyComparator),使得在聚合过程后,对key按照先邮编,再时间,最后金额的顺序排序,key-value是键值对,key按照我们的意愿排好序了,

value也就排好了。

 

 

总的来说:降序什么的都是CompositeKeyComparator来决定的。

 

代码结构:

 (1)key:组合键

  1 package com.book.test1;
  2 import java.io.DataInput;
  3 import java.io.DataOutput;
  4 import java.io.IOException;
  5 
  6 import org.apache.hadoop.io.DoubleWritable;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.io.Writable;
 10 import org.apache.hadoop.io.WritableComparable;
 11 /**
 12  * 这个的作用就是要数据在分区里面有序
 13  */
 14 /**
 15  * 定义组合键:就是可以把自己要比较的字段写入
 16  * @author Sxq
 17  *
 18  */
 19 //必须要时间这个WritableComparable这个类
 20 public class CompositeKey implements Writable, WritableComparable<CompositeKey> {
 21 
 22     // 股票的名字
 23     private Text stockSymbol;
 24     // 日期
 25     private LongWritable timestamp;
 26     private DoubleWritable price;
 27 
 28     
 29     public DoubleWritable getPrice() {
 30         return price;
 31     }
 32     public void setPrice(DoubleWritable price) {
 33         this.price = price;
 34     }
 35     public CompositeKey()
 36     {
 37         
 38     }
 39     public CompositeKey(Text _stockSymbol, LongWritable _timestamp,DoubleWritable _price) {
 40         this.stockSymbol = _stockSymbol;
 41         this.timestamp = _timestamp;
 42         this.price=_price;
 43     }
 44 
 45     
 46 
 47     public Text getStockSymbol() {
 48         return stockSymbol;
 49     }
 50 
 51 
 52     public void setStockSymbol(Text stockSymbol) {
 53         this.stockSymbol = stockSymbol;
 54     }
 55 
 56 
 57 
 58     public LongWritable getTimestamp() {
 59         return timestamp;
 60     }
 61 
 62 
 63 
 64     public void setTimestamp(LongWritable timestamp) {
 65         this.timestamp = timestamp;
 66     }
 67 
 68 
 69 
 70     //读出
 71     public void readFields(DataInput input) throws IOException {
 72         String value1=input.readUTF();
 73         long value2=input.readLong();
 74       this.stockSymbol=new  Text( value1);
 75       this.timestamp=  new LongWritable(value2);
 76       this.price=new DoubleWritable(input.readDouble());
 77     }
 78     
 79    //写入
 80     
 81     //@Override
 82     public void write(DataOutput output) throws IOException {
 83         output.writeUTF(this.stockSymbol.toString());
 84         output.writeLong(this.timestamp.get());
 85         output.writeDouble(this.price.get());
 86     }
 87     
 88     public int compareTo(CompositeKey other) {
 89         
 90            int comparator=this.stockSymbol.compareTo(other.stockSymbol);
 91             if(comparator==0)
 92             {
 93                 comparator=this.timestamp.compareTo(other.timestamp);
 94             }
 95         
 96         //升序
 97         //return comparator;
 98     
 99     return -comparator;
100     }
101 
102 
103     @Override
104     public String toString() {
105         return "CompositeKey [stockSymbol=" + stockSymbol + ", timestamp=" + timestamp + "]";
106     }
107     
108 }

(2)key对应的value:

package com.book.test1;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class NaturalValue implements Writable {
private long timestamp;
private double privce;


public long getTimestamp() {
    return timestamp;
}

public void setTimestamp(long timestamp) {
    this.timestamp = timestamp;
}

public double getPrivce() {
    return privce;
}

public void setPrivce(double privce) {
    this.privce = privce;
}



public void readFields(DataInput input) throws IOException {
    this.timestamp=input.readLong();
    this.privce=input.readDouble();
    
    
    
    
}

public void write(DataOutput output) throws IOException {
    
    
    output.writeLong(this.timestamp);
    output.writeDouble(this.privce);
    
}

    
    
    
    
}

(3)分区器:

NaturalKeyPartitioner

 

package com.book.test1;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * 分区:按照邮编分,把邮编相同的放在一起
 * @author Sxq
 */

public class NaturalKeyPartitioner extends Partitioner<CompositeKey, NaturalValue> {

    @Override
    public int getPartition(CompositeKey key, NaturalValue value, int numPartitions) {
        return Math.abs((int)(key.getStockSymbol().hashCode())%numPartitions);
    }
    
    

}

(4)把key排序的比较器:在shuffle过程中用到的

package com.book.test1;

import javax.print.attribute.standard.MediaSize.Other;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 这个类的作用是把组合键排序,使得组合键也有顺序
 * @author Sxq
 *
 */
public class CompositeKeyComparator extends WritableComparator {

    public CompositeKeyComparator() {
         super(CompositeKey.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        CompositeKey ck1 = (CompositeKey) a;
        CompositeKey ck2 = (CompositeKey) b;
        int comparison = ck1.getStockSymbol().compareTo(ck2.getStockSymbol());
        //如果邮编相同,则根据日期进一步处理。
        if (comparison == 0) {
            
            int comparison2=ck1.getTimestamp().compareTo(ck2.getTimestamp());
            // 如果日期相同,则需要根据价格进一步处理
            if (comparison2==0) {
                //按照价格降序
                return ck1.getPrice().compareTo(ck2.getPrice())>0?-1:1;

            } else {
                //日期不同,就按照日期降序
                return ck1.getTimestamp().compareTo(ck2.getTimestamp())>0?-1:1;            
            }
            }        
        else {
            return comparison;
        }
    }
    static {   
        WritableComparator.define(CompositeKey.class, new CompositeKeyComparator());   
    }

}

(5)reduce的分区器:

CompositeGroupingComparator

package com.book.test1;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 分组:就是在reduce阶段分到一个组;
 * 就是邮编相同的放在一个组里面
 * @author Sxq
 *
 */
public class CompositeGroupingComparator extends WritableComparator{
    
     public CompositeGroupingComparator() {

     super(CompositeKey.class,true);
     }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      CompositeKey v1=(CompositeKey)a;
      CompositeKey v2=(CompositeKey)b;
      
      return v1.getStockSymbol().compareTo(v2.getStockSymbol());
        
        
        
        
    }

    
    
}

(6)驱动类:

package com.book.test1;

import java.io.IOException;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Cmain {
    static class Map1 extends Mapper<LongWritable, Text, CompositeKey, NaturalValue> {
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, CompositeKey, NaturalValue>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString().trim();
            String[] lines = line.split(",");
            Date date = DateUtil.getDate(lines[1]);
            //long timestamp = date.getTime();
            
            long timestamp=UtilsCmain.DataTranform(lines[1]);
            CompositeKey compositeKey = new CompositeKey();
            NaturalValue naturalValue = new NaturalValue();
            naturalValue.setPrivce(Double.valueOf(lines[2]));
            naturalValue.setTimestamp(timestamp);
            compositeKey.setStockSymbol(new Text(lines[0]));
            compositeKey.setPrice(new DoubleWritable(Double.valueOf(lines[2])));
            compositeKey.setTimestamp(new LongWritable(timestamp));
            context.write(compositeKey, naturalValue);
        }

    }

    static class reduce1 extends Reducer<CompositeKey, NaturalValue, Text, Text> {
        @Override
        protected void reduce(CompositeKey key, Iterable<NaturalValue> vlaue,
                Reducer<CompositeKey, NaturalValue, Text, Text>.Context context) throws IOException, InterruptedException {

            Iterator<NaturalValue> iterator = vlaue.iterator();
            StringBuffer stringBuffer = new StringBuffer();
            while (iterator.hasNext()) {
                NaturalValue naturalValue=iterator.next();
                stringBuffer.append("(");
                stringBuffer.append(naturalValue.getTimestamp());
                stringBuffer.append(","+naturalValue.getPrivce()+")");
            }

            context.write(new Text(key.getStockSymbol()), new Text(stringBuffer.toString()));
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(Cmain.class);

        job.setMapperClass(Map1.class);
        job.setReducerClass(reduce1.class);

        job.setMapOutputKeyClass(CompositeKey.class);
        job.setMapOutputValueClass(NaturalValue.class);

        job.setOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);

        job.setSortComparatorClass(CompositeKeyComparator.class);
        // 在Reduce端设置分组,使得同一个邮编的在同一个组
        job.setGroupingComparatorClass(CompositeGroupingComparator.class);
        // 设置分区
        job.setPartitionerClass(NaturalKeyPartitioner.class);

        // 指定输入的数据的目录
        FileInputFormat.setInputPaths(job, new Path("/Users/mac/Desktop/stock.txt"));

        FileOutputFormat.setOutputPath(job, new Path("/Users/mac/Desktop/flowresort"));

        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

}

(7)工具类:将2012-12-09转为20121209这种形式:

package com.book.test1;

public class UtilsCmain {
     /**
      * 时间
      */
     public static  long  DataTranform(String vaule)
    {
        String[] args=vaule.split("-");
        String datatime=args[0]+args[1]+args[2];
        
         return Long.valueOf(datatime);
    
    }
    
    
}

运行结果:

原文地址:https://www.cnblogs.com/shenxiaoquan/p/8671702.html