hadoop mapreduce求解有序TopN(高效模式)

1、在map阶段对数据先求解改分片的topN,到reduce阶段再合并求解一次,求解过程利用TreeMap的排序特性,不用自己写算法。

2、样板数据,类似如下

1 	13682846555	192.168.100.12	www.qq.com	1938	2910	200

3、code

3.1 mapper

public class TopNMapper extends Mapper<LongWritable, Text, FlowBeanSorted,Text> {
    // 定义一个TreeMap作为存储数据的容器(天然按key排序)
    private TreeMap<FlowBeanSorted, Text> flowMap = new TreeMap<>();
    private enum Counters {LINES}

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.getCounter(Counters.LINES).increment(1);
        String lines = value.toString();
        String[] fields = lines.split("\s+");
        String phoneNumber = fields[1];
        long upFlow = Long.parseLong(fields[fields.length-3]);
        long downFlow = Long.parseLong(fields[fields.length-2]);

        FlowBeanSorted k = new FlowBeanSorted();
        Text v = new Text();

        k.setAll(upFlow,downFlow);
        v.set(phoneNumber);

        flowMap.put(k,v);

        //限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据
        if (flowMap.size() > 10) {
//        flowMap.remove(flowMap.firstKey());
            flowMap.remove(flowMap.lastKey());
        }

    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        Iterator<FlowBeanSorted> bean = flowMap.keySet().iterator();

        while (bean.hasNext()) {

            FlowBeanSorted k = bean.next();

            context.write(k, flowMap.get(k));
        }

    }
}

3.2 reducer

public class TopNReducer extends Reducer<FlowBeanSorted, Text,Text,FlowBeanSorted> {
    // 定义一个TreeMap作为存储数据的容器(天然按key排序)
    TreeMap<FlowBeanSorted, Text> flowMap = new TreeMap<>();

    @Override
    protected void reduce(FlowBeanSorted key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {

            FlowBeanSorted bean = new FlowBeanSorted();
            bean.setAll(key.getUpFlow(),key.getDownFlow());

            // 1 向treeMap集合中添加数据
            flowMap.put(bean, new Text(value));

            // 2 限制TreeMap数据量,超过10条就删除掉流量最小的一条数据
            if (flowMap.size() > 10) {
                // flowMap.remove(flowMap.firstKey());
                flowMap.remove(flowMap.lastKey());
            }
        }

    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
//        遍历集合,输出数据
        Iterator<FlowBeanSorted> it = flowMap.keySet().iterator();

        while (it.hasNext()) {

            FlowBeanSorted v = it.next();

            context.write(new Text(flowMap.get(v)), v);
        }

    }
}

3.3 driver

public class TopNDriver {
    public static void main(String[] args) throws Exception {

        args  = new String[]{"input/phone*.txt","output/"};

        //获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //指定本程序的jar包所在的本地路径
        job.setJarByClass(TopNDriver.class);

        //指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(TopNMapper.class);
        job.setReducerClass(TopNReducer.class);

        //指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(FlowBeanSorted.class);
        job.setMapOutputValueClass(Text.class);

        //指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBeanSorted.class);

        //指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        Path outPath = new Path(args[1]);
        FileSystem fs = FileSystem.get(configuration);
        if(fs.exists(outPath)){
            fs.delete(outPath,true);
        }

        //将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}
 
原文地址:https://www.cnblogs.com/asker009/p/11458823.html