Spark Streaming基础

一、Spark Streaming概述:
 
是基于Spark core的API,不需要单独安装,一盏式解决
 
可扩展、高吞吐量、容错性、能够运行在多节点、结合了批处理、机器学习、图计算等
 
将不同的数据源的数据经过Spark Streaming处理后输出到外部文件系统
 
 
1. 应用场景:
 
实时交易防欺诈检测、传感器异常实时反应
 
整理Spark发展史问题(缺少)
 
 
2. Spark Streaming工作原理:
 
粗粒度:
把实时数据流,以秒数拆分成批次的小数据块,通过Spark当成RDD来处理
 
细粒度:
 
 
 
3. 核心概念:
 
编程入口:StreamingContext
 
常用构造方法源码:
 
def this(sparkContext: SparkContext, batchDuration: Duration) = {
  this(sparkContext, null, batchDuration)
}
 
def this(conf: SparkConf, batchDuration: Duration) = {
  this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
 
batchDuration 是必须填的,根据应用程序的延迟需求和资源可用情况来设置
 
定义好streamingContext后,再定义DStream、transformation等,通过start()开始,stop()结束
 
注意:
一个context启动后,不能再运行新的streaming(一个JVM只能有一个streamingContext)
 
一旦停止后,就没办法再重新开始
 
Stop方法默认把sparkContext和streamingContext同时关掉,要不想关掉sc,必须定义stopSparkContext参数为false
 
一个SparkContext能够创建多个StreamingContext
 
 
 最基础的抽象:Discretized Stream  (DStream)
 
一系列的RDD代表一个DStream,是不可变的、分布式的dataset
 
 
每一个RDD代表一个时间段(批次)的数据
 
对DStream进行操作算子(flatMap)时,在底层上看就是对每一个RDD做相同的操作,交由Spark core运行
 
 
 数据输入:Input DStreams and Receivers
 
每一个Input DStream 关联着一个Receiver(但从文件系统接收不需要receiver),receiver 接收数据并存在内存中
 
receiver需要占用一个线程,所以不能定义local[1],线程的数量n必须大于receivers的数量
 
 
转换:Transformations on DStreams
 
与RDD类似:map、flatMap、filter、repartition、count...
 
 
 数据输出:Output Operations and DStreams:
 
输出到数据库或者文件系统:
 
API:print、save、foreach
 
 
 
二、Spark Streaming实战部分:
 
  1. Spark Streaming处理socket数据:
 
接收到的数据进行WordCount操作:
 
在IDEA中:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
/*
* Spark Streaming 处理Socket数据
* */
object NetWorkWordCount {
  def main(args: Array[String]): Unit = {
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetWorkWordCount")
 
    //创建streamingContext的两个参数sparkConf和seconds
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //生成Input DStream
    val lines = ssc.socketTextStream("localhost", 6789)
 
    val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
 
    result.print()
 
    ssc.start()
    ssc.awaitTermination()
 
 
  }
}
 
在控制台中:
 
nc -lk 6789,创建一个Socket
 
在这上面输入数据,就可以在IDEA中count出来了
 
 
注意:
在执行过程中会报错,必须在Maven projects中找出报错提示中所缺少的包,并且在dependency上加入。
当projects中还没有的包,在http://mvnrepository.com 上搜索相应的dependency,然后让Maven帮我们自动下载。
 
 
 
 
  1. Spark Streaming处理HDFS中的数据:
 
ssc.textFileStream("file_path")
 
同样是像上面一样,只是改了stream的source
 
但是测试时,必须要是生成新的文件(官网称为moving进去的文件),才会被统计;而往旧的文件里再添加数据,也不会被统计了
 
 
 
  1. Spark Streaming进阶实战:
 
带状态的算子UpdateStateByKey、保存到MySQL、window函数
 
 
UpdateStateByKey实现实时更新:
 
允许把新旧状态结合,连续地更新
 
准备工作:
  1. 定义一个状态
  2. 定义状态更新的方法
 
注意:
 
  1. updateFunction需要隐式转换
  2. 报错:Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set.
            意思就是要进行checkpoint记录
 
 
实现代码:
 
把reduceByKey删除,并且把map之后的RDD定义为一个state,配合这个state写状态更新方法
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
/*
* Spark Streaming有状态的统计
* */
object StatefulWordCount {
  def main(args: Array[String]): Unit = {
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //使用状态算子必须要设置checkpoint
    //一般要保存记录在HDFS中
    ssc.checkpoint(".")
 
    val lines = ssc.socketTextStream("localhost", 6789)
    val result = lines.flatMap(_.split(" ")).map((_, 1))  //不能用reduceByKey
 
    //连续更新状态
    val state = result.updateStateByKey(updateFunction _)  //需要隐式转换
    state.print()
 
    ssc.start()
    ssc.awaitTermination()
  }
 
  /*
  * 状态更新方法更新已有的数据,放在updateStateByKey中
  * */
  def updateFunction(currData: Seq[Int], prevData: Option[Int]): Option[Int] = {
 
    val curr = currData.sum  //算出当前的总次数
    val prev = prevData.getOrElse(0)  //读取已有的
 
    //返回已有和当前的和
    Some(curr + prev)
  }
}
 
 
  1. 统计结果写到MySQL中:
 
前提准备:
需要在IDEA中增加mysql的connector依赖
在mysql数据库中先创建一张表
写jdbc创建连接到Mysql
 
 
使用foreachRDD,有很多种错误的写法:(没有序列化,创建太多mysql连接等)
 
报错没有序列化:
 
dstream.foreachRDD {rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach {record =>connection.send(record) // executed at the worker
}
}
 
花太多开销在连接和断开数据库上
 
dstream.foreachRDD {rdd =>
rdd.foreach {record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
 
 
官方正确写法:
 
使用foreachPartition进行优化连接:
 
dstream.foreachRDD {rdd =>
rdd.foreachPartition {partitionOfRecords =>
val connection = createNewConnection()  //创建mysql连接
partitionOfRecords.foreach(record =>
connection.send(record))
connection.close()
}
}
 
用连接池进行进一步优化:
 
Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches. One can maintain a static pool of connection objects than can be reused as RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
 
dstream.foreachRDD {rdd =>
rdd.foreachPartition {partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record =>connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
 
在写入MySQL数据时,应该作一个是否存在的判断:
若存在则使用update语句,不存在则使用insert语句
 
 
 
  1. Window的使用:
 
 
两个参数:
window length:窗口长度
sliding interval:窗口间隔
 
也就是每隔sliding interval统计前window length的值
 
API:countByWindow、reduceByWindow…
 
 
 
  1. 实战:黑名单过滤
 
transform算子的使用+Spark Streaming整合RDD操作
 
元组默认从1开始数
 
假设输入数据为id, name 这种形式
 
实现过程:
  1. 建立黑名单元组 => (name, true)
  2. 把输入数据流编程元组 => (name, (id, name))
  3. transform,把每个DStream变成一个个RDD操作
  4. 数据流的RDD与黑名单RDD进行leftjoin,获得新的元组
  5. filter判断过滤
  6. 整合输出
 
实现代码:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* 黑名单过滤demo
* */
object TransformApp {
  def main(args: Array[String]): Unit = {
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("TransformApp")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //构建黑名单列表, 实际应用中可在外面读取列表, 并转成RDD, 用true标记为是黑名单元组(name, true)
    val blacks = List("zs", "ls")
    val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))
 
    //获取每行
    val lines = ssc.socketTextStream("localhost", 6789)
    //把id, name => 元组(name, (id, name))
    //transform 的使用,对stream的每个RDD操作
    val filterResult = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
      //与黑名单进行leftjoin => (name, ((id, name), true)), 并过滤出是true的项
      rdd.leftOuterJoin(blacksRDD)
        .filter(x => x._2._2.getOrElse(false) != true)   //过滤出不等于true的
        .map(x => x._2._1)
    })
    filterResult.print()
 
    ssc.start()
    ssc.awaitTermination()
  }
}
 
 
  1. Spark Streaming整合Spark SQL
 
整合完成词频统计操作
 
官网代码:
 
 
就是foreachRDD把streaming转成RDD,然后toDF就可以进行DataFrame或者是sql的操作了
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
原文地址:https://www.cnblogs.com/kinghey-java-ljx/p/8544314.html