Spark(三)【RDD中的自定义排序】

在RDD中默认的算子sortBy,sortByKey只能真的值类型数据升序或者降序

现需要对自定义对象进行自定义排序。

一组Person对象

  /**
   * Person 样例类
   * @param name
   * @param age
   */
  case class Person1(name: String, age: Int) {
    override def toString = {
      "name: " + name + ",age: " + age
    }
val list = List(Person1("tom", 12), Person1("tom1", 13), Person1("tom2", 13))

sortBy:单Value类型RDD排序

方法一:类继承Ordered

  //类继承Ordered
  case class Person(name: String, age: Int) extends Ordered[Person] with Serializable {
    //重写toString
    override def toString = {
      "name: " + name + ",age: " + age
    }

    //自定义排序
    override def compare(that: Person): Int = {
      //先按照age降序排序
      var result = -this.age.compareTo(that.age)
      //如果age相同,按照name升序排序
      if (result == 0) {
        result = this.name.compareTo(that.name)
      }
      result
    }
  }

使用

    val list = List(Person("tom", 12), Person("tom1", 13), Person("tom2", 13))
    val conf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.makeRDD(list).sortBy(x=>x).saveAsTextFile("output2")

方法二:实现Ordering

   //类不需要改动
   case class Person1(name: String, age: Int) {
    override def toString = {
      "name: " + name + ",age: " + age
    }

使用

    val list = List(Person1("tom", 12), Person1("tom1", 13), Person1("tom2", 13))
    val rdd = sc.makeRDD(list)
    //自定义排序: age降序,name升序,先按照谁排序,就放在前面                        reverse:反转,降序
    rdd.sortBy(person => (person.age, person.name), numPartitions = 1)(Ordering.Tuple2(Ordering.Int.reverse, Ordering.String)
      , ClassTag(classOf[Tuple2[String, Int]])).saveAsTextFile("output5")

sortByKey:Key-Value类型RDD排序

只能针对key对k-v数据进行排序

方法一:类继承Ordered

同sortBy方法一,样例类继承ORdered

将单值转为K-V类型,key为Person对象。

方法二:实现Ordering

创建一个Person1类型的隐式Ordering[Person1]的比较器

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description: TODO
 * @author: HaoWu
 * @create: 2020年08月04日
 */
object SortByKeyOrderingTest {
  def main(args: Array[String]): Unit = {
    //创建一个Person1类型的隐式Ordering[Person1]的比较器
    implicit val ord = new Ordering[Person1] {
      //自定义排序:age降序,name升序
      override def compare(x: Person1, y: Person1): Int = {
        //age降序
        var result = -x.age.compareTo(y.age)
        //name升序
        if (result == 0){
          result = x.name.compareTo(y.name)
        }
        result
      }
    }
    val list = List(Person1("tom", 12), Person1("tom1", 13), Person1("tom2", 13))
    val conf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd = sc.makeRDD(list)
    //转为K-V形式,按照key排序
    rdd.map((_,1)).sortByKey().coalesce(1).saveAsTextFile("output")
  }
}

/**
 * Person 样例类
 * @param name
 * @param age
 */
case class Person1(name: String, age: Int) {
  override def toString = {
    "name: " + name + ",age: " + age
  }
}

结果:
(name: tom1,age: 13,1)
(name: tom2,age: 13,1)
(name: tom,age: 12,1)
原文地址:https://www.cnblogs.com/wh984763176/p/13432806.html