spark textfile rdd 日记

批量处理模板方法, 核心处理方法为内部方法

  def batchProces(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = {

    //自定义RDD,此处为demo
    val dataRDD = sc.makeRDD(List(1, 2), numPartitions)
    dataRDD.mapPartitions(iterator => {

      val rawData = iterator.toList
      var lstT = new ListBuffer[(Int, Int)]()

      rawData.foreach(v => {
        if (lstT.size < 50) {
          lstT.append((v, 1))
        } else {
          //每50处理一次
          procesData()
        }
      })

      //剩余的继续处理
      procesData()

      //批量处理逻辑
      def procesData() = {

        //核心处理逻辑
        // doProcess
        //很重要
        lstT.clear()
      }

      lstT.iterator

    }).map((_, 1)).reduceByKey(_ + _).sortByKey().saveAsTextFile("hdfs://hdfscluster/tmp/logs/")
  }

批量处理模板方法, 核心处理方法为外部方法

  def process_outer(lst: List[(Int, Int)]) = {
    //外部核心处理逻辑,如Request请求等
    RequestUtil.postJson("http://xxx", "{paraData}", 1000)
  }

  def batchProces_processOuter(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = {
    val fooCount = sc.longAccumulator("fooCount")
    //自定义RDD,此处为demo
    val dataRDD = sc.makeRDD(List(1, 2), numPartitions)
    dataRDD.foreachPartition(iterator => {


      val rawData = iterator.toList
      var lstT = new ListBuffer[(Int, Int)]()

      rawData.foreach(v => {
        if (lstT.size < 50) {
          lstT.append((v, 1))
        } else {
          //每50处理一次
          process_outer(lstT.toList)
          fooCount.add(lstT.size)
          lstT.clear()
        }
      })

      //剩余的继续处理
      if (lstT.size > 0) {
        process_outer(lstT.toList)
        fooCount.add(lstT.size)
        lstT.clear()
      }
    });
    println("total =>" + fooCount.value)
  }

 

针对文本文件RDD的一些处理逻辑:

  //针对单个文件,每行数据超长的情况, 先对行进行拆分,再重新分区,将数据交给多个executor去执行
  def bigLine(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = {
    val fileRDD = sc.textFile("hdfs://hdfscluster/tmp/logs/abc.txt", numPartitions)

    //对于长文本, 先拆分,然后重新分区,提高并发机器利用率, 减少job执行时间
    fileRDD.flatMap(_.split(",")).repartition(24).foreach(println(_))
  }

  //针对无规律零散路径,循环内部使用sc
  def handlerPath_lingsan(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = {
    val rawPath: List[String] = List("hdfs://hdfscluster/tmp1/path1", "hdfs://hdfscluster/tmp2/path2", "hdfs://hdfscluster/tmp3/path3")
    val lsResult = rawPath.flatMap(v => {
      sc.textFile(v).map((_, 1)).collect().toList
    }).toList.foreach(println(_))
  }

  //针对文件夹, 
  def handlerPath_directroy(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = {
    //按行输出指定文件夹下所有文件,分区有效
    val txtRDD = sc.textFile("hdfs://hdfscluster/tmp1/*", numPartitions)
    //重新分区,便于输出结果
    txtRDD.map((_, 1)).repartition(1)
      .saveAsTextFile("hdfs://hdfscluster/tmp/logs/ssoHot3")
  }

  //针对文件夹,且路径下文件数量比较多且比较小的情况 
  def handlerPath_directroy(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = {

    //返回结果key=文件路径,val=文件内容, 如果content太大的话,容易造成OOM
    val dirRDD = sc.wholeTextFiles("hdfs://hdfscluster/tmp1/*", numPartitions)
    dirRDD.flatMap(v => {
      v._2.split(System.lineSeparator()).map((_, 1))
    }).repartition(1).saveAsTextFile("hdfs://hdfscluster/tmp/logs/ssoHot3")

  }

//java scala转换

  def java_scala_collection_convert = {
    var lstT = new ListBuffer[Int]()
    //注意java,scala转换
    import scala.collection.JavaConverters._
    val lstBack = SensitiveDevice.batchDecrypt(lstT.toList.asJava).asScala
  }
原文地址:https://www.cnblogs.com/snow-man/p/13686099.html