SparkStreaming

1.workcount

package dayo7

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NewworkWordCount {
Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    //new Conf
    val conf = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" )

    //创建ssc  第二个参数是时间间隔
    val ssc = new StreamingContext ( conf, Seconds ( 2 ) )

    //获取数据
    val result = ssc.socketTextStream ( "192.168.186.150", 1234 )

    //处理数据,输出打印
    val result2 = result.flatMap ( _.split ( " " ) ).map ( (_, 1) ).reduceByKey ( _ + _ ).print ()

    //开启sparkStreaming
    ssc.start ()

    //创建阻塞线程
    ssc.awaitTermination ()
  }
  }

2.将数据写到redis

开启redis bin/redis-server  etc/redis.conf  查看端口 ps -ef|grep redis

package dayo7

import day08.Jpoods
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCountToRedis1 {
Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    //new xonf
    val conf=new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")

    //创建SparkStreaming
    val ssc=new StreamingContext(conf,Seconds(2))

    //读取数据
    val result: ReceiverInputDStream[String] =ssc.socketTextStream("192.168.186.150",1234)


    result.foreachRDD(rdd=>{
      //处理数据
      val result2=rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

   //写入到redis
      result2.foreachPartition(filter=>{
        val jedis=Jpoods.getJedis()

        //创建表
        filter.foreach(tp=>{
          jedis.hincrBy("zi",tp._1,tp._2)
        })

        //关闭jedis
        jedis.close()

      })

    })

    //开启SparkStraming
    ssc.start()

    //创建阻塞线程
    ssc.awaitTermination()
  }
}

 3.完整版SparkStreaming

package sortby

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCountSortBy2 {
  Logger.getLogger ( "org" ).setLevel ( Level.WARN )

  def main(args: Array[String]): Unit = {
    //StreamingContect去加载
    val ssc = StreamingContext.getOrCreate ( "./test1", creatingFunc )
    ssc.start ()
    ssc.awaitTermination ()
  }


  def creatingFunc(): StreamingContext = {
    val conf = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" )
    //多长时间处理一次呀
    val ssc = new StreamingContext ( conf, Seconds ( 3 ) )

    //缓存数据
    ssc.checkpoint ( "./test1" )

    //接收数据
    val lines = ssc.socketTextStream ( "192.168.186.150", 1234 )

    //更改时间  数据多久保存一次
    lines.checkpoint ( Seconds ( 5 ) )

    //处理数据
    lines.flatMap ( _.split ( " " ) ).map ( (_, 1) ).updateStateByKey ( updateFunc ).print ()

    ssc
  }

//Seq[Int] 每个key新增值的集合   Option[Int]当前的保存状态
  def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] = {
    Some ( seq.sum + option.getOrElse ( 0 ) )
  }
}
原文地址:https://www.cnblogs.com/wangshuang123/p/11113347.html