SparkStreaming

Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等

和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。

离线数据:不可改变数据;实时数据:改变对数据;  流式处理批量处理

批量(微批次,不是流式处理)

什么是DStream

    DSream 代表了一系列连续的RDD,DStream中每个RDD包含特定时间间隔的数据;离散流, 一个或多个RDD

SparkStreaming架构

WordCount案例

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

StreamingContext中有这个构造方法: def this(conf: SparkConf, batchDuration: Duration)
//测试Spark实时计算
object StreamWordCount {
  def main(args: Array[String]): Unit = {

    //创建配置对象
    val conf: SparkConf = new SparkConf().setAppName("Streaming").setMaster("local[*]")
    val streamContext: StreamingContext = new StreamingContext(conf, Seconds(5))
    //通过监控端口创建DStream,读进来的数据为一行行
    val socket: ReceiverInputDStream[String] = streamContext.socketTextStream("hadoop101", 9999)
    //将每一行数据做切分,形成一个个单词  读取是按一行一行来读 line ==> word
    val dsTream: DStream[String] = socket.flatMap(_.split(" "))
    //将单词映射成元组(word,1)
    val word: DStream[(String, Int)] = dsTream.map((_, 1))
    //reduceByKey
    val wordCount: DStream[(String, Int)] = word.reduceByKey(_+_)
      //打印
    wordCount.print()
    //启动采集器
    streamContext.start()
    //Driver不能停止,等待采集器的结束
    streamContext.awaitTermination()
  }
[kris@hadoop101 ~]$ nc -lk 9999
Hello world
Hello 
Hello java
Hello spark

如果程序运行时,log日志太多,可以将spark conf目录下的log4j文件放中resources里边,日志级别改成ERROR

DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据

1. 文件数据源

文件数据流:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方法进行读取,Spark Streaming 将会监控 dataDirectory 目录并不断处理移动进来的文件,记住目前不支持嵌套目录。

streamingContext.textFileStream(dataDirectory), 其他与上边代码相同;

注意事项:

1)文件需要有相同的数据格式;

2)文件进入 dataDirectory的方式需要通过移动或者重命名来实现;

3)一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据;

2. 自定义数据源

需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。  自定义数据源,实现监控某个端口号,获取该端口号内容。

 自定义数据采集器:

// 自定义数据采集器
class CustomerReceive(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){ //有一个构造方法
  var socket: Socket = null
  //读数据并将数据发送给Spark
  def receive(): Unit = {
    //创建一个Socket
    val socket = new Socket(host, port)
    //字节流 ---->字符流
    val inputStream: InputStream = socket.getInputStream //字节流
    //字符流
    val bufferedReader: BufferedReader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"))
    var line: String = null
    while ((line = bufferedReader.readLine()) != null){
      if (!"--END--".equals(line)){
        store(line) //存储到这里边
      }else{
        return
      }
    }
  }
  //启动采集器
  //最初启动的时候,调用该方法,作用为:读数据并将数据发送给Spark
  override def onStart(): Unit = {
    new Thread(new Runnable{
      override def run(): Unit = {
        receive()
      }
    }).start()
  }
  //关闭采集器
  override def onStop(): Unit = {
    if (socket != null){
      socket.close()
      socket = null
    }
  }

}
//测试:
object FileStream {
  def main(args: Array[String]): Unit = {
    // 创建流式处理环境对象
    // 创建对象时,需要传递采集数据的周期(时间)
    val conf: SparkConf = new SparkConf().setAppName("Streaming").setMaster("local[*]")
    val streamContext: StreamingContext = new StreamingContext(conf, Seconds(5))

    // 从端口号获取数据
    val socketDStream: ReceiverInputDStream[String] = streamContext.receiverStream(new CustomerReceive("hadoop101", 9999))
    // 一行一行的数据 line ==> word
    val wordDStream: DStream[String] = socketDStream.flatMap(_.split(" "))
    // word ==> (word, 1)
    val wordToCountDStream: DStream[(String, Int)] = wordDStream.map((_, 1))
    // reduceByKey
    val wordToSumDStream: DStream[(String, Int)] = wordToCountDStream.reduceByKey(_ + _)
    //打印数据
    wordToSumDStream.print()
    // TODO 启动采集器
    streamContext.start()
    // TODO Driver不能停止,等待采集器的结束
    // wait, sleep
    streamContext.awaitTermination()

  }
}

3. Kafka数据源(重点)

KafkaUtils 对象可以在 StreamingContext 和 JavaStreamingContext 中以你的 Kafka 消息创建出 DStream。由于 KafkaUtils 可以订阅多个主题,因此它创建出的 DStream 由成对的主题和消息组成。要创建出一个流数据,需要使用 StreamingContext 实例、一个由逗号隔开的 ZooKeeper 主机列表字符串、消费者组的名字(唯一名字),以及一个从主题到针对这个主题的接收器线程数的映射表来调用 createStream() 方法。

//监听kafka消息
object KafkaStreaming {
  def main(args: Array[String]): Unit = {
    // 创建配置对象
    val sparkConf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]")
    // 创建流式处理环境对象
    // 创建对象时,需要传递采集数据的周期(时间)
    val socket: StreamingContext = new StreamingContext(sparkConf, Seconds(5))

    // 一个类如果创建SparkContext,那么这个类我们称之为Driver类
    // 从Kafka集群中获取数据
    //定义kafka参数
    val kafkaParams = Map[String, String](
      "group.id" -> "kris",
      "zookeeper.connect" -> "hadoop101:2181",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer",//StringDeserializer的全类名,StringDeserializer implements Deserializer<String>
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"
    ) //别导错包流,是kafka.clients.consumer里对
    //定义topic参数
    val topicMap = Map("thrid" -> 3)

    val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      socket,
      kafkaParams,
      topicMap,
      StorageLevel.MEMORY_ONLY) //StorageLevel别导错包流
    val wordToCountDStream = kafkaDStream.map {
      case (k, v) => {(v, 1)}
    }
    val wordToSumDStream: DStream[(String, Int)] = wordToCountDStream.reduceByKey(_ + _)
    //打印数据
    wordToSumDStream.print()
    //启动采集器
    socket.start()
    //Driver不能停,等待采集器对结束
    socket.awaitTermination()
  }
}

启动kafka并中控制台启动一个生产者

[kris@hadoop101 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic thrid

打印:

-------------------------------------------
Time: 1555065970000 ms
-------------------------------------------
(Hello world,1)

-------------------------------------------
Time: 1555065975000 ms
-------------------------------------------
(Hello,1)

-------------------------------------------
Time: 1555065980000 ms
-------------------------------------------
(Hello,1)
(java,1)

-------------------------------------------
Time: 1555065985000 ms
-------------------------------------------
(spark,1)

-------------------------------------------
View Code

DStream转换

DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

4. 有状态转化操作(重点)

 UpdateStateByKey

UpdateStateByKey原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件 更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。

updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。

updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,你需要做下面两步: 
  1. 定义状态,状态可以是一个任意的数据类型。 
  2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。(key只要相同它的状态就会更新

key单词相同了就会形成一个数量对集合,Seq[Int]就是那个数量(比如Hello, 1; Hello, 1;Seq即1 1 1);Option只有两个值(some有值和none没值),为了解决空指针出现对类,不用判断当前对象是否为空,可直接使用option

更新状态:多条数据之间是否有关系, 有状态 无状态
周期间采集数据是无状态的,但实时数据需要是有状态的,与checkPoint做聚合-->就有状态了

把数据保持中CheckPoint,buffer临时对缓冲

//SparkStreaming有状态转换操作
object DStreamState {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Stream").setMaster("local[*]")
    val streamContext: StreamingContext = new StreamingContext(conf, Seconds(5))
    //设置Checkpoints的目录
    streamContext.sparkContext.setCheckpointDir("cp")
    val socketDStream: ReceiverInputDStream[String] = streamContext.socketTextStream("hadoop101", 9999)

    val wordDStream: DStream[String] = socketDStream.flatMap(_.split(" "))

    val wordToCountDStream: DStream[(String, Int)] = wordDStream.map((_, 1))
//    进行有状态的转换操作
    val resultDStream: DStream[(String, Long)] = wordToCountDStream.updateStateByKey {// 要加范型
      case (seq, buffer) => {  //seq序列当前周期中单词对数量对集合, buffer表缓冲当中的值,所谓的checkPoint
        val sumCount = seq.sum + buffer.getOrElse(0L)
        Option(sumCount) //表往缓存里边更新对值  它需要返回一个Option
      }
    }
    resultDStream.print()
    streamContext.start()
    streamContext.awaitTermination()

  }
}

打印:

有状态转换操作
-------------------------------------------
Time: 1555070600000 ms
-------------------------------------------
(Hello,1)
(world,1)

-------------------------------------------
Time: 1555070605000 ms
-------------------------------------------
(Hello,2)
(world,2)

-------------------------------------------
Time: 1555070610000 ms
-------------------------------------------
(Hello,3)
(java,1)
(world,2)

-------------------------------------------
Time: 1555070615000 ms
-------------------------------------------
(Hello,3)
(java,1)
(world,2)
View Code

Window Operations

Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

窗口数据是指一段时间范围内的数据作为一个整体使用,随着时间的推移,窗口数据也会发生变化,这样的函数称为窗口函数,并且这个窗口可以发生变化,也称为滑动窗口

object DStreamWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Stream").setMaster("local[*]")
    val streamContext: StreamingContext = new StreamingContext(conf, Seconds(3))

    val socketDStream: ReceiverInputDStream[String] = streamContext.socketTextStream("hadoop101", 9999)

    // 设定数据窗口:window
    // 第一个参数表示窗口的大小(时间的范围,应该为采集周期的整数倍)
    // 第二个参数表示窗口的滑动的幅度(时间的范围,应该为采集周期的整数倍)
    val windowDStream: DStream[String] = socketDStream.window(Seconds(6), Seconds(3))
    val wordDStream: DStream[String] = windowDStream.flatMap(_.split(" "))
    val wordCountDStream: DStream[(String, Int)] = wordDStream.map((_, 1))
    val wordSumDStream: DStream[(String, Int)] = wordCountDStream.reduceByKey(_+_)

    wordSumDStream.print()
    streamContext.start()
    streamContext.awaitTermination()
  }
}

Transform

Transform原语允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。

Transform和map对区别:

    // TODO XXXXXX (Drvier) * 1,这里可写Driver代码但只执行一遍;
    wordSumDStream.map{
      case(word, sum) => {
        // TODO YYYYYY (Executor) * N ,这里执行的是Executor代码可执行N遍
        (word, 1)
      }
    }
    // transform可以将DStream包装好的RDD抽取出来进行转换操作
    // transform可以在每一个采集周期对rdd进行操作
  // TODO AAAAAA (Driver) * 1
    wordSumDStream.transform{
      rdd => {
        // TODO BBBBBBB (Driver) * N
        rdd.map{
          case (word, sum) => {
            // TODO CCCCCC (Executor) * N
            (word, 1)
          }
        }
      }
    }

DStream输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。

输出操作如下:

(1)print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。

(2)saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”. 

(3)saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。

(4)saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。
Python API Python中目前不可用。

(5)foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。注意:函数func在运行流应用的驱动中被执行,同时其中一般函数RDD操作从而强制其对于流RDD的运算。

通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。

比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。 注意:

(1)连接不能写在driver层面;

(2)如果写在foreach则每个RDD都创建,得不偿失;

(3)增加foreachPartition,在分区创建。

 

原文地址:https://www.cnblogs.com/shengyang17/p/10699511.html