分组取topN

假设有这样一个文件,文件内容如下

class1 90
class2 89
class1 87
class1 99
class2 100
class2 95
class1 78
class2 85
class1 92
class2 91

要求按照班级分组取出每个班前三名,源码如下:

package swy.spark.spark_study_java.core;

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * 分组取top3
 * @author swy
 *
 */

public class GroupTop3 {
	
	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("ActionOperation")
				.setMaster("local");
				//.setMaster("spark://192.168.43.124:7077");
		
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		JavaRDD<String> lines = sc.textFile("E://swy//resource//workspace-neno//spark-study-java//txt//score.txt");
		
		JavaPairRDD<String, Integer> pairs = lines.mapToPair(
				new PairFunction<String, String, Integer>(){
					
					private static final long serialVersionUID = 1L;

					public Tuple2<String, Integer> call(String s) throws Exception {
						String[] lineSpited = s.split(" ");
						return new Tuple2<String, Integer>(lineSpited[0], Integer.valueOf(lineSpited[1]));
					}
			
		});
		
		//将成绩按班级分组
		JavaPairRDD<String, Iterable<Integer>> groupPairs = pairs.groupByKey(); 
		
		JavaPairRDD<String, Iterable<Integer>> top3Sores = groupPairs.mapToPair(
				new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>(){
					
					private static final long serialVersionUID = 1L;
					
					public Tuple2<String, Iterable<Integer>> call (
							Tuple2<String, Iterable<Integer>> classSorces) throws Exception {
						Integer[] top3 = new Integer[3];
						
						String className = classSorces._1;
						Iterator<Integer> scores = classSorces._2.iterator();
						
						while (scores.hasNext()) {
							Integer score = scores.next();
							
							for (int i = 0; i<3; i++) {
								if (top3[i] == null) {
									top3[i] = score;
									break;
								} else if (score > top3[i]) {
									int tmp = top3[i];
									top3[i] = score;
									
									if (i < top3.length - 1) {
										top3[i+1] = tmp;
									}
									break;
								}
							}
						}
						return new Tuple2<String, 
								Iterable<Integer>>(className, Arrays.asList(top3));
					}
		});
		
		top3Sores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>(){
			private static final long serialVersionUID = 1L;
			
			public void call(Tuple2<String, Iterable<Integer>> v) throws Exception {
				System.out.println("班级:" + v._1);
				System.out.println("前三名:" + v._2);
			}
			
		});
		
		sc.close();
	}
} 

topN的排序算法可以理解:

假如有三个山洞,一群土匪排着队来抢占山洞,按如下规则占领山洞,下面算法保证第一个山洞主人永远是最厉害的,以此类推

土匪 :待排序序列
三个山洞:top3
妻子:一个临时tmp变量
打架:比较大小
for (土匪 : i) {
    for (山洞 : i) {
        if( 山洞空) 
入住 ;
break;
else (山洞有人:和原主人打架) {
            if (打赢){
                 入住
                 原主人带着妻子住下一个山洞(假如还有的话)
            }
break; } } }

实现:

while (scores.hasNext()) {
    Integer score = scores.next();
    
    for (int i = 0; i<3; i++) {
        if (top3[i] == null) {
            top3[i] = score;
break; }
else if (score > top3[i]) { int tmp = top3[i]; top3[i] = score; if (i < top3.length - 1) { top3[i+1] = tmp; }
break; } } }

  

原文地址:https://www.cnblogs.com/suwy/p/9510430.html