Spark实现二次排序

一.代码实现

 1 package big.data.analyse.scala.secondsort
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.sql.SparkSession
 5 
 6 /** 二次排序
 7   * Created by zhen on 2019/5/29.
 8   */
 9 class SecondSortByKey(val first:Int,val second:Int) extends Ordered[SecondSortByKey] with Serializable{
10   def compare(other : SecondSortByKey): Int ={
11     if(this.first - other.first != 0){//正序
12       this.first - other.first
13     }else{//倒序
14       other.second - this.second
15     }
16   }
17 }
18 object SecondSortByKey{
19   /**
20     * 设置日志级别
21     */
22   Logger.getLogger("org").setLevel(Level.WARN)
23   def main(args: Array[String]) {
24     val spark = SparkSession
25       .builder()
26       .appName("SecondSortByKey")
27       .master("local[2]")
28       .getOrCreate()
29     val sc = spark.sparkContext
30     val rows = sc.textFile("src/big/data/analyse/scala/secondsort/sort.txt")
31 
32     val pairWithSortByKey = rows
33       .filter(row=>row.split(" ").length==3)//过滤错误的数据
34       .map(row=>{
35         val array = row.split(" ")
36         (new SecondSortByKey(array(0).toInt,array(1).toInt),row)
37     })
38     println("先正序后倒序")
39     pairWithSortByKey
40       .sortByKey(true) // 排序,true:先正序后倒序,false:先倒序后正序
41       .map(map => map._2)
42       .collect()
43       .foreach(println)
44     println("先倒序后正序")
45     pairWithSortByKey
46       .sortByKey(false) // 排序,true:先正序后倒序,false:先倒序后正序
47       .map(map => map._2)
48       .collect()
49       .foreach(println)
50 
51     sc.stop()
52   }
53 }

二.结果

先正序后倒序
1 9 ES
1 8 HBase
2 4 Tachyon日渐成熟
2 3 《黑豹》异军突起
2 3 Radis
3 3 HDFS
3 3 搜索引擎
5 3 spark发布2.4版本,性能提升巨大
5 2 《复仇者联盟3:无限战争》火热上映
6 3 Maven
7 2 Solr
先倒序后正序
7 2 Solr
6 3 Maven
5 2 《复仇者联盟3:无限战争》火热上映
5 3 spark发布2.4版本,性能提升巨大
3 3 HDFS
3 3 搜索引擎
2 3 《黑豹》异军突起
2 3 Radis
2 4 Tachyon日渐成熟
1 8 HBase
1 9 ES

Process finished with exit code 0
原文地址:https://www.cnblogs.com/yszd/p/10947523.html