RDD实例

实例一:

teacher.log

http://bigdata.baidu.cn/zhangsan
http://bigdata.baidu.cn/zhangsan
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/wangwu
http://bigdata.baidu.cn/wangwu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/wangwu
http://bigdata.baidu.cn/wangwu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/wangwu
http://bigdata.baidu.cn/wangwu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://php.baidu.cn/laoli
http://php.baidu.cn/laoliu
http://php.baidu.cn/laoli
http://php.baidu.cn/laoli

全局topn  组内topn

代码:

package dayo1

import java.net.URL

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object teacher2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf ()
      .setAppName ( this.getClass.getSimpleName )
      .setMaster ( "local[1]" )

    val sc = new SparkContext ( conf )

    val lines = sc.textFile ( "E:\teacher.log" )

    val overAll: RDD[((String, String), Int)] = lines.map ( tp => {
      val teacher: String = tp.split ( "/" ).last
      val host = new URL ( tp ).getHost
      val subject = host.substring ( 0, host.indexOf ( "." ) )
      ((teacher, subject), 1)
    } )
    //所有科目和老师的前三
    val topOverAll = overAll.reduceByKey ( _ + _ ).sortBy ( -_._2 ).take ( 3 ).foreach ( println )

    //每个科目前两名的老师
    val topGroup = overAll.reduceByKey ( _ + _ ).groupBy ( _._1._2 ).mapValues ( _.toList.sortBy ( -_._2 ).take ( 2 ) ).foreach ( println )
    sc.stop ()


  }
}

实例二:

去重

file1:
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c


file2:
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c

代码:

package dayo1

import org.apache.spark.{SparkConf, SparkContext}

object distinct {
  def main(args: Array[String]): Unit = {
    val cof = new SparkConf ()
      .setAppName ( this.getClass.getSimpleName )
      .setMaster ( "local[1]" )

    val sc = new SparkContext ( cof )

    val file1 = sc.textFile ( "E:\file1.txt" )
    val file2 = sc.textFile ( "E:\file2.txt" )
    val list = file1.union ( file2 ).distinct ().sortBy ( tp => tp )
    list.foreach ( println )
    sc.stop ()
  }
}

 实例三:

temperature.txt

0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
0067011990999991950032418004888888880500001N9+00001+9999999999999999999999
0067011990999991950051507004888888880500001N9+00781+9999999999999999999999

需求:分析每年的最高温度

代码:

package dayo1

import org.apache.spark.{SparkConf, SparkContext}

/**
  * 0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
  * 0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
  * 0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
  * 0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
  * 0067011990999991950032418004888888880500001N9+00001+9999999999999999999999
  * 0067011990999991950051507004888888880500001N9+00781+9999999999999999999999
  *
  * 12345678911234567892123456789312345678941234567895123456789612345678971234
  * 需求:分析每年的最高温度
  * 数据说明:
  *
  *
  * 第15-19个字符是year 6-9
  *
  * 第45-50位是温度表示,+表示零上 -表示零下,且温度的值不能是9999,9999表示异常数据
  *
  * 第50位值只能是0、1、4、5、9几个数字
  */
object temperature {
  def main(args: Array[String]): Unit = {
    val cof = new SparkConf ()
      .setAppName ( this.getClass.getSimpleName )
      .setMaster ( "local[*]" )
    val sc = new SparkContext ( cof )

    val lines = sc.textFile ( "E:\temperature.txt" )

    val yearAndTemp = lines.filter ( tp => {
      var temp = 0
      val query = tp.charAt ( 50 ).toString //val query=tp.subString(50,51)
      if (tp.charAt ( 45 ).equals ( "+" )) {
        temp = tp.substring ( 45, 50 ).toInt
      } else {
        temp = tp.substring ( 45, 50 ).toInt
      }
      temp != 9999 && query.matches ( "[01459]" )

    } ).map ( tp => {

      val year = tp.substring ( 15, 19 )
      var temp = 0
      if (tp.charAt ( 45 ).equals ( "+" )) {
        temp = tp.substring ( 45, 50 ).toInt
      } else {
        temp = tp.substring ( 45, 50 ).toInt
      }

      (year, temp)
    } )


    val res = yearAndTemp.reduceByKey ( (x, y) => if (x > y) x else y )

    res.foreach ( tp => println ( "year:" + tp._1 + "  temp:" + tp._2 ) )
    sc.stop ()
  }
}
原文地址:https://www.cnblogs.com/wangshuang123/p/11058723.html