Spark 自定义分区及区内二次排序demo

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Partitioner
import org.apache.spark.HashPartitioner

object Demo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("app")
    val sc = new SparkContext(conf)
    val data = sc.textFile("F:\test\test\ssort.txt")
    
    //先分区, 再区内排序
    data.map{x=>
      val arr = x.split(" ")
      (arr(0),arr(1).toInt)
    }.partitionBy(new MySparkPartition(2)).mapPartitions{x=>
      //此处的sortBy为scala中list集合的方法, 与Spark中RDD的sortBy方法不一样,注意区分!!!
      x.toList.sortBy{case(x,y)=>
        (x, -y)
      }.toIterator
    }.saveAsTextFile("F:\test\test\output")
    
    //data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).top(3)(Ordering.by(_._2)).foreach(println)
    
   /*data.map{x=>
     (new SecondarySortKey(x.split(" ")(0), x.split(" ")(1).toInt))
   }.sortBy(x=>x, true).map{x=>(x.first, x.second)}.foreach(println)*/
    
    /*data.sortBy({x=>
      (new SecondarySortKey(x.split(" ")(0), x.split(" ")(1).toInt))
    }, true).foreach(println)*/
    
    /*val data1 = data.sortBy({x=>
      (new SecondarySortKey(x.split(" ")(0),x.split(" ")(1).toInt))  
    },true).map{x=>
      val arr = x.split(" ")
      (arr(0), arr(1))
    }.partitionBy(new MySparkPartition(2)).saveAsTextFile("F:\test\test\output")*/
    
    /*val l1 = List[(String,Int)](("a",1),("b",2),("d",4),("c",3),("a",2))
    //l1.sortBy(x=>(x._1,x._2))(Ordering.Tuple2(Ordering.String,Ordering.Int.reverse))
    l1.sortBy{case(x,y) =>
      (x, -y)
    }
    .foreach(println)*/
  }
}

class MySparkPartition(numsPartitions:Int) extends Partitioner{
  def numPartitions:Int = numsPartitions
  
  override def getPartition(key:Any):Int={
    if(key == "aa"){
      return 1
    }else{
      return 0
    }
  }
}

class SecondarySortKey(val first:String, val second:Int) extends Ordered[SecondarySortKey] with Serializable{
  def compare(other:SecondarySortKey):Int={
    var comp = this.first.compareTo(other.first)
    if(comp == 0){
      other.second.compareTo(this.second)
    }else{
      comp
    }
  }
}

  

原文地址:https://www.cnblogs.com/shuzhiwei/p/11322463.html