使用Spark-Core算子写的WordCount的11种解决方案

通过Spark-Core API写的WordCount的11种解决方案:

package com.fym.spark.core.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.awt.image.IndexColorModel
import scala.collection.mutable

object Spark03_WordCount {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    val sc = new SparkContext(sparkConf)

    wordCount11(sc)

    sc.stop()
  }

  //groupBy
  def wordCount1(sc:SparkContext): Unit ={

    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))

    val words = rdd.flatMap(_.split(" "))
    val group:RDD[(String,Iterable[String])] = words.groupBy(word => word)
    val wordCount:RDD[(String,Int)] = group.mapValues(iter => iter.size)

  }

  //groupByKey,含有shuffle的过程,如果数据量较大,那么性能会受到影响
  def wordCount2(sc:SparkContext): Unit ={

    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))

    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val group:RDD[(String,Iterable[Int])] = wordOne.groupByKey()
    val wordCount:RDD[(String,Int)] = group.mapValues(iter => iter.size)

  }

  //reduceByKey,含有预聚合的功能,会减少shuffle时落盘的数据量
  def wordCount3(sc:SparkContext): Unit ={

    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))

    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount:RDD[(String,Int)] = wordOne.reduceByKey(_ + _)


  }

  //aggregateByKey
  def wordCount4(sc:SparkContext): Unit ={

    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))

    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount:RDD[(String,Int)] = wordOne.aggregateByKey(0)(_+_,_+_)

  }

  //foldByKey
  def wordCount5(sc:SparkContext): Unit ={

    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))

    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount:RDD[(String,Int)] = wordOne.foldByKey(0)(_+_)

  }

  //combineByKey
  def wordCount6(sc:SparkContext): Unit ={

    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))

    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount:RDD[(String,Int)] = wordOne.combineByKey(
      v=>v,
      //第一个值的转换过程需要动态识别,所以需要在后面的两个参数加上类型
      (x:Int,y:Int) => x+y,
      (x:Int,y:Int) => x+y
    )

  }
  //countByKey
  def wordCount7(sc:SparkContext): Unit ={

    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))

    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordcount:scala.collection.Map[String,Long] = wordOne.countByKey()

  }
  //countByValue
  def wordCount8(sc:SparkContext): Unit ={

    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))

    val words = rdd.flatMap(_.split(" "))

    val wordcount:scala.collection.Map[String,Long] = words.countByValue()

  }

  //reduce
  def wordCount9(sc:SparkContext): Unit ={

    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))

    val words = rdd.flatMap(_.split(" "))

    // 期望的格式 [(word1,sum1),(word2,sum2),...]
    // string => (string,1)
    val mapWord = words.map(
      word => {
        mutable.Map[String, Long]((word, 1))
      }
    )

    val wordcount = mapWord.reduce(
      (map1, map2) => {
        map2.foreach {
          case (word, count) => {
            val newcount = map1.getOrElse(word, 0l) + count
            map1.update(word, newcount)
          }
        }
        map1
      }
    )

    println(wordcount)
  }
  //aggregate
  def wordCount10(sc:SparkContext): Unit ={

    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))

    val words = rdd.flatMap(_.split(" "))

    // 期望的格式 [(word1,sum1),(word2,sum2),...]
    // string => (string,1)
    val mapWord = words.map(
      word => {
        mutable.Map[String, Long]((word, 1))
      }
    )

    val wordcount = mapWord.aggregate(mutable.Map[String, Long]())(
      (map1, map2) => {
        map2.foreach {
          case (word, count) => {
            val newcount = map1.getOrElse(word, 0l) + count
            map1.update(word, newcount)
          }
        }
        map1
      },
      (map1, map2) => {
        map2.foreach {
          case (word, count) => {
            val newcount = map1.getOrElse(word, 0l) + count
            map1.update(word, newcount)
          }
        }
        map1
      }
    )

    println(wordcount)
  }
  //fold
  def wordCount11(sc:SparkContext): Unit ={

    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))

    val words = rdd.flatMap(_.split(" "))

    // 期望的格式 [(word1,sum1),(word2,sum2),...]
    // string => (string,1)
    val mapWord = words.map(
      word => {
        mutable.Map[String, Long]((word, 1))
      }
    )

    val wordcount = mapWord.fold(mutable.Map[String, Long]())(
      (map1, map2) => {
        map2.foreach {
          case (word, count) => {
            val newcount = map1.getOrElse(word, 0l) + count
            map1.update(word, newcount)
          }
        }
        map1
      }
    )

    println(wordcount)
  }
}
原文地址:https://www.cnblogs.com/yxym2016/p/14196874.html