spark-------------8种实现wordCount方法

引言

通过学习RDD,并了解和掌握RDD的转换算子和行动算子。现在对所有能实现wordCount的功能的算子总结一下。

正文

用了8个方法来实现wordCount。通过对比,发现有些方法类似。运行结果读者自行验证

代码

package com.xiao.spark.core.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object AllWoldCount {
  def main(args: Array[String]): Unit = {



      // 建立和spark框架的连接
      val conf = new SparkConf().setMaster("local").setAppName("WordCount");
      val sc = new SparkContext(conf);
      wordCount8(sc)
      sc.stop();

  }
    //  groupBy
    def wordCount1(sc : SparkContext): Unit ={
        val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
        // 扁平化操作:拆分所有句子,把所有单词放到一个list里
        val words: RDD[String] = rdd.flatMap(_.split(" "))
        // 将所有相同的单词放到一个元组里
        val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
        val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
        println(wordCount.collect().mkString(","))
    }

    // groupByKey
    def wordCount2(sc : SparkContext): Unit ={
        val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
        // 扁平化操作:拆分所有句子,把所有单词放到一个list里
        val words: RDD[String] = rdd.flatMap(_.split(" "))
        // 将每个单词变成 word => (word,1)
        val wordToOne: RDD[(String, Int)] = words.map(word => (word,1))  //ords.map((_,1))
        // (scala,CompactBuffer(1)),(spark,CompactBuffer(1)),(hello,CompactBuffer(1, 1))
        val group: RDD[(String, Iterable[Int])] = wordToOne.groupByKey()
        val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
        println(wordCount.collect().mkString(","))
    }

    // reduceByKey
    def wordCount3(sc : SparkContext): Unit ={
        val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
        // 扁平化操作:拆分所有句子,把所有单词放到一个list里
        val words: RDD[String] = rdd.flatMap(_.split(" "))
        // 将每个单词变成 word => (word,1)
        val wordToOne: RDD[(String, Int)] = words.map((_,1))
        val wordCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
        println(wordCount.collect().mkString(","))
    }

    // aggregateByKey
    def wordCount4(sc : SparkContext): Unit ={
        val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
        // 扁平化操作:拆分所有句子,把所有单词放到一个list里
        val words: RDD[String] = rdd.flatMap(_.split(" "))
        // 将每个单词变成 word => (word,1)
        val wordToOne: RDD[(String, Int)] = words.map((_,1))
        // 初始值 分区内的操作 分区间操作
        val wordCount: RDD[(String, Int)] = wordToOne.aggregateByKey(0)(_+_,_+_)
        println(wordCount.collect().mkString(","))
    }

    // foldByKey
    def wordCount5(sc : SparkContext): Unit ={
        val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
        // 扁平化操作:拆分所有句子,把所有单词放到一个list里
        val words: RDD[String] = rdd.flatMap(_.split(" "))
        // 将每个单词变成 word => (word,1)
        val wordToOne: RDD[(String, Int)] = words.map((_,1))
        // 初始值 分区内和分区间操作相同
        val wordCount: RDD[(String, Int)] = wordToOne.foldByKey(0)(_+_)
        println(wordCount.collect().mkString(","))
    }

    // combineByKey
    def wordCount6(sc : SparkContext): Unit ={
        val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
        // 扁平化操作:拆分所有句子,把所有单词放到一个list里
        val words: RDD[String] = rdd.flatMap(_.split(" "))
        // 将每个单词变成 word => (word,1)
        val wordToOne: RDD[(String, Int)] = words.map((_,1))
        // 初始值 分区内和分区间操作相同
        val wordCount: RDD[(String, Int)] = wordToOne.combineByKey(
            v => v,     // 初始值的操作
            (x : Int,y :Int) => x+y,  // 分区内的操作
            (x : Int,y :Int) => x+y,  // 分区间的操作
        )
        println(wordCount.collect().mkString(","))
    }

    // countByKey
    def wordCount7(sc : SparkContext): Unit ={
        val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
        // 扁平化操作:拆分所有句子,把所有单词放到一个list里
        val words: RDD[String] = rdd.flatMap(_.split(" "))
        // 将每个单词变成 word => (word,1)
        val wordToOne: RDD[(String, Int)] = words.map((_,1))
        val wordCount: collection.Map[String, Long] = wordToOne.countByKey()
        println(wordCount)
    }

    // countByValue
    def wordCount8(sc : SparkContext): Unit ={
        val rdd: RDD[String] = sc.makeRDD(List("hello spark", "hello scala"))
        // 扁平化操作:拆分所有句子,把所有单词放到一个list里
        val words: RDD[String] = rdd.flatMap(_.split(" "))
        val wordCount: collection.Map[String, Long] = words.countByValue()
        println(wordCount)
    }
}
原文地址:https://www.cnblogs.com/yangxiao-/p/14347390.html