spark streaming的有状态例子

import org.apache.spark._
import org.apache.spark.streaming._


/**
  * Created by code-pc on 16/3/14.
  */
object Pi {
  def updateStateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    Some(runningCount.getOrElse(0) + newValues.sum)
  }


  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[5]").setAppName("AndrzejApp")
    val ssc = new StreamingContext(conf, Seconds(4))
    ssc.checkpoint("/tmp")

    val line = ssc.socketTextStream("127.0.0.1", 9997)
    val words=line.flatMap(_.split(" "))
    val pairs=words.map((_,1))

    pairs.print()

    val retDS = pairs.updateStateByKey(updateStateFunction _)

    retDS.print()

    ssc.start()
    ssc.awaitTermination()
  }

}

tcp:
nc -lk 9997

原文地址:https://www.cnblogs.com/ggzone/p/10121132.html