spark-wordcount-sample算子测试

import org.apache.spark.{SparkConf, SparkContext}

object radomSampleU {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("WordCount_groupBy")
      .setMaster("local")
      //  .set("spark.default.parallelism", "100") //  1. 调节并行度
      .set("spark.executor.memory ","4g")
      .set("spark.executor.cores","5")
      .set("spark.executor.nums","4")//1
      //静态内存机制
      .set("spark.memory.useLegacyMode","false")
      .set("spark.storage.memoryFraction", "0.3")// 5.cache占用的内存占比,default=0.6
      //统一内存机制
      .set("spark.memory.Fraction","0.3")//default=0.6
      .set("spark.storage.storageFraction","0.9")//default=0.5
      .set("spark.shuffle.consolidateFiles", "false")
      //过滤多余日志文件
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val startTime=System.currentTimeMillis()
    val inpath= "F:\hml\dataset\1021\1021\####.txt"//
    val lines = sc.textFile(inpath)//.cache()   // 读取本地文件
    val words = lines.flatMap(_.split(" ")).filter(word => word != " ")  // 拆分单词,并过滤掉空格,当然还可以继续进行过滤,如去掉标点符号
    //sample采样测试
    words.partitions.size
    println("样本汇总结果***********************************")
    val wordsample=words.sample(false,0.0005)
    wordsample.map(word => (word, 1))
      .reduceByKey(_ + _)
      .collect()
      .foreach(println)

println("总体数据汇总结果*************************")
    val pairs = words.map(word => (word, 1))  // 在单词拆分的基础上对每个单词实例计数为1, 也就是 word => (word, 1)
    val start1=System.currentTimeMillis()
    val wordscount = pairs.reduceByKey(_ + _)//.collect() .foreach(println) // 在每个单词实例计数为1的基础之上统计每个单词在文件中出现的总次数, 即key相同的value相加
    val end1=System.currentTimeMillis()
    wordscount.collect.foreach(println)  // 打印结果,使用collect会将集群中的数据收集到当前运行drive的机器上,需要保证单台机器能放得下所有数据
    val endTime=System.currentTimeMillis()
    println("应用总耗时"+(endTime-startTime))
    println("reduceByKey耗时"+ (end1-start1))
    Thread.sleep(1000000)
    sc.stop()   // 释放资源
  }
}
原文地址:https://www.cnblogs.com/moonlightml/p/10221081.html