SparkStreaming 之 (一)worldCount demo

一、pom

  <dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>2.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.0.0</version>
    </dependency>
    <!-- spark -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.3.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.4</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.6.6</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.3.4</version>
    </dependency>
    <!-- log4j 1.2.17 -->
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>
  </dependencies>

二、回忆hive如何worldcount?

链接:https://www.cnblogs.com/blogyuhan/p/9269057.html

关键:使用explode把文本切成数组,再从数组中count(1)并group by!

三、使用sparkStreaming统计文本

1-1 使用reduceByKey

1)下载netcat,并打开端口:

yum install nmap-ncat.x86_64 
nc -l 1234 # 暴露1234端口给你监控

2)

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object MySpark {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("app")
    val sc = SparkContext.getOrCreate(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    // 窗口,5秒一个
    val ds = ssc.socketTextStream("192.168.56.111", 1234)
    val res = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    res.print()
    // 启动容器
    ssc.start()
    ssc.awaitTermination() // 永远别停
  }
}

结果:

2-1 使用 "根据key更新状态",统计类型为Int

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object MySpark {
  val updateFunc =(it:Iterator[(String,Seq[Int],Option[Int])])=>{
    it.map(x=>{
      (x._1,x._2.sum+x._3.getOrElse(0))
    })
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("app")
    val sc = SparkContext.getOrCreate(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    // 窗口,5秒一个
    val ds = ssc.socketTextStream("192.168.56.111", 1234)
  //  val res = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    //当使用updateStateByKey这个算子必须设置setCheckpointDir
    sc.setCheckpointDir("D:/cks")
    //计算wordcount 累计
    val res = ds.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(sc.defaultParallelism), true)
    res.print()
    // 启动容器
    ssc.start()
    ssc.awaitTermination() // 永远别停
  }
}

结果:

(www,9)

(aaa,11) 会进行累加

2-2 统计类型为自定义样例类 map((_,Userinfos(1,"zc",2)))

(_,Userinfos(1,"zc",2))变成(_,Int)

case class Userinfos(userid:Int,username:String,age:Int)
object MySpark {
  val updateFunc: Iterator[(String, Seq[Userinfos], Option[Int])] => Iterator[(String, Int)] = (it:Iterator[(String,Seq[Userinfos],Option[Int])])=>{
    it.map(x=>{
      (x._1,x._2.map(u=>u.age).sum+x._3.getOrElse(0))
    })
  }

(_,Userinfos(1,"zc",2)) => (_,Userinfos(1,"zc",sum)) 

case class Userinfos(userid:Int,username:String,age:Int)
object MySpark {
  val obj = Userinfos(0,"",0)
  val updateFunc  = (it:Iterator[(String,Seq[Userinfos],Option[Userinfos])])=>{
    it.map(x=>{
      (x._1,Userinfos(1,"xc",x._2.map(u=>u.age).sum+x._3.getOrElse(obj).age))
    })
  }

2-3  统计类型为(Int,String,Int)

// 把seq元组变成seq(int).sum+...

(x._1,x._2.map(u=>u._3).sum+x._3.getOrElse((1,"",0))._3)

模式匹配 case (x,y是Seq,z是Option)

(x,(y.map(u=>u.3).sum)+z.getOrElse((1,"",0)))._3))
原文地址:https://www.cnblogs.com/sabertobih/p/14121019.html