01Spark的TopN问题

和hadoop的目的一样,给你数据,然后取TopN。数据如下:

取出数据在排名前十的数据。

代码如下:

package com.test.book;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class SparkTon {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("SparkTon").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("/Users/mac/Desktop/TopN2.txt");

        // 将数据读进来,拆分为Tuple(String,Integer)这种形式
        JavaPairRDD<String, Integer> pairRDD = lines.mapToPair(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String t) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(t.split(",")[0], Integer.valueOf(t.split(",")[1]));
            }
        });

        // 按照整个分区来处理。
        JavaRDD<SortedMap<Integer, String>> pairspart = pairRDD
                .mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Integer>>, SortedMap<Integer, String>>() {

                    private static final long serialVersionUID = 1L;
                    SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();

                    @Override
                    public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> t)
                            throws Exception {

                        while (t.hasNext()) {
                            Tuple2<String, Integer> tuple2 = t.next();

                            top10.put(tuple2._2, tuple2._1);
                            if (top10.size() > 10) {
                                top10.remove(top10.firstKey());
                            }
                        }
                        return Collections.singleton(top10);
                    }
                });

        // 把各个分区处理好的数据拿过来。
        List<SortedMap<Integer, String>> allTop10 = pairspart.collect();
        // 在Reduce端用TreeMap对之前的分区数据排序。
        SortedMap<Integer, String> finalmap = new TreeMap<Integer, String>();

        // 遍历每个分区的SortedMap结构
        for (SortedMap<Integer, String> localTop10 : allTop10) {

            for (Map.Entry<Integer, String> entry : localTop10.entrySet()) {

                finalmap.put(entry.getKey(), entry.getValue());
                if (finalmap.size() > 10) {
                    finalmap.remove(finalmap.firstKey());
                }

            }
        }

        // 打印出来。
        Set values = finalmap.keySet();

        Iterator<Integer> iterator = values.iterator();

        while (iterator.hasNext()) {

            System.out.println(finalmap.get(iterator.next()));

        }

    }

}

结果:

原文地址:https://www.cnblogs.com/shenxiaoquan/p/8697796.html