实时分析系统--SparkStreaming

第1章  SparkStreaming概述

1.1  Spark Streaming是什么

  Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:KafkaFlumeTwitterZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:mapreducejoinwindow等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。

  和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStreamDStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列(因此得名离散化”)所以简单来将,DStream就是对RDD在实时数据处理场景的一种封装。

1.2  Spark Streaming的特点

  • 易用

  • 容错

  • 易整合到Spark体系

1.3  Spark Streaming架构

1.3.1  架构图

  • 整体架构图

  • SparkStreaming架构图

1.3.2 背压机制

  Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。

  为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure: 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。

  通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。

2Dstream入门

2.1 WordCount案例实操

  • 需求:使用netcat工具向3333端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

  1) 添加依赖

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.0.1</version>
</dependency>

  2)添加 log4j.properties

log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

  3) 编写代码

package com.yuange.sparkstreaming

package com.yuange.sparkstreaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @作者:袁哥
 * @时间:2021/7/2 17:02
 */
object WordCount {

  def main(args: Array[String]): Unit = {
    // ①创建编程入口,StreamingContext
    val streamingContext = new StreamingContext("local[2]","wc",Seconds(4))
    //②创建编程模型: DStream  根据不同的数据源创建不同的DS
    val ds: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop103",3333)
    // ③调用DS中的方法进行运算
    val result: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
    // ④调用行动算子,例如输出,打印等
    result.print(1000)
    // ⑤真正的计算,会在启动了app之后运行
    streamingContext.start()
    // ⑥流式应用,需要一直运行(类似agent) 不能让main运行完,阻塞main
    streamingContext.awaitTermination()
  }
}

  4) 启动程序并通过netcat发送数据:

nc -lk 3333

2.2 WordCount解析

  Discretized StreamSpark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据

  对数据的操作也是按照RDD为单位来进行的

  计算过程由Spark Engine来完成

3DStream创建

3.1 RDD队列

3.1.1 用法及说明

  测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。

3.1.2 案例实操

  • 需求:循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount

  1) 编写代码

package com.yuange.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
 * @作者:袁哥
 * @时间:2021/7/2 19:26
 */
object RDDStream {

  def main(args: Array[String]): Unit = {
    //初始化Spark配置信息
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
    //初始化SparkStreamingContext
    val ssc = new StreamingContext(conf,Seconds(4))
    //创建RDD队列
    val rddQueue = new mutable.Queue[RDD[Int]]()
    //创建QueueInputDStream
    val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
    //处理队列中的RDD数据
    val reducedStream = inputStream.map((_,1)).reduceByKey(_ + _)
    //打印结果
    reducedStream.print()
    //启动任务
    ssc.start()
    //循环创建并向RDD队列中放入RDD
    for (i <- 1 to 4){
      rddQueue += ssc.sparkContext.makeRDD(1 to 300,10)
      Thread.sleep(1500)
    }
    ssc.awaitTermination()
  }
}

  2) 结果展示

3.2 自定义数据源

3.2.1 用法及说明

  需要继承Receiver,并实现onStartonStop方法来自定义数据源采集。

3.2.2 案例实操

  需求:自定义数据源,实现监控某个端口号,获取该端口号内容。

  1) 自定义数据源

package com.yuange.sparkstreaming

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

/**
 * @作者:袁哥
 * @时间:2021/7/2 19:42
 */
class CustomerReceiver(host: String,port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
    //最初启动的时候,调用该方法,作用为:读数据并将数据发送给Spark
    override def onStart(): Unit = {
      new Thread("Socket Receiver") {
        override def run() {
          receive()
        }
      }.start()
    }

    //读数据并将数据发送给Spark
    def receive(): Unit = {
      //创建一个Socket
      var socket: Socket = new Socket(host,port)
      //定义一个变量,用来接收端口传过来的数据
      var input: String = null
      //创建一个BufferedReader用于读取端口传来的数据

      val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
      //读取数据
      input = reader.readLine()

      //当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark
      while (!isStopped() && input != null) {
        store(input)
        input = reader.readLine()
      }

      //跳出循环则关闭资源
      reader.close()
      socket.close()
    }

    override def onStop(): Unit = {}
}

  2) 使用自定义的数据源采集数据

package com.yuange.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @作者:袁哥
 * @时间:2021/7/2 19:54
 */
object FileStream {

  def main(args: Array[String]): Unit = {
    //初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
    //初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf,Seconds(4))
    //创建自定义的receiver的Streaming
    val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop103", 3333))
    //将每行数据做切分,形成一个个单词
    val wordStream = lineStream.flatMap(_.split("	"))
    //将单词映射为元组
    val wordAndOneStream = wordStream.map((_,1))
    //6.将相同的单词次数做统计
    val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)
    //7.打印
    wordAndCountStream.print()
    //8.启动SparkStreamingContext
    ssc.start()
    ssc.awaitTermination()
  }

}

3.3 Kafka数据源

  ReceiverAPI(Receiver 模式):需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用

  DirectAPI(Direct模式(推荐)):是由计算的Executor来主动消费Kafka的数据,速度由自身控制

  官方文档:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

3.3.1 Direct模式官网版

  1)代码

package com.yuange.kafka

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._

/**
 * @作者:袁哥
 * @时间:2021/7/3 14:00
 */
object KafkaDirectModeDemo {

  def main(args: Array[String]): Unit = {

    val streamingContext = new StreamingContext("local[2]","wc",Seconds(5))

    //配置消费者信息
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",  //集群地址
      "key.deserializer" -> classOf[StringDeserializer],    //Key-value的反序列化器
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "yuange", //消费者组id
      "auto.offset.reset" -> "latest",  //消费者id(可选)
      "enable.auto.commit" -> "true"  //是否自动提交offset,如果要自动提交,多久自动提交1次
    )

    //指定消费主题
    val topics = Array("topicA")

    // 使用提供的API,从kafka中获取一个DS
    val ds = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      /*
       *    位置策略:  指 Task允许的Executor 和 kafka的broker 的网络拓扑关系
       *            大部分情况, LocationStrategies.PreferConsistent
       *                            PreferConsistent : 大部分情况,公司用
       *                            PreferBrokers:  executor和broker在同一台机器
       *                            PreferFixed: 自定义
       */
      LocationStrategies.PreferConsistent,
      /*
       *     消费策略
       *              独立消费者:       不需要借助Kafka集群保存Offset
       *                                      assign
       *              非独立消费者(大部分):      需要kafka集群,采取分配策略为消费者组的每个消费者分配分区!
       *                                    需要借助kafka集群保存offset
       *                                      subscribe
       */
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    val ds1 = ds.map(recored => recored.value())
    ds1.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

  2)启动kafka生产者线程生产数据

bin/kafka-console-producer.sh --broker-list hadoop104:9092 --topic topicA

  3)查看结果

3.3.2 Direct模式演示丢失数据

  1)代码

package com.yuange.kafka

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

/**
 * @作者:袁哥
 * @时间:2021/7/3 22:32
 *
 *     数据处理语义:
 *            at least once : 至少一次,可以是 1次或多次,存在重复处理!
 *            at most once(没人用) :  至多一次,可以是0次或1次, 存在丢数据风险!
 *            eaxctly once :  精确一次,不多不少
 *                    at least once  + 去重  = eaxctly once
 *     目前是自动提交offset,因此程序运行到49时,只是从kafka消费到了数据,还没有进行计算,就已经自动提交了offset!
 *     无论发生什么情况,都只能从提交的offset后,再消费,如果程序异常,异常期间的那批数据,也无法在程序重启后,再次消费到!
 */
object KafkaDirectLoseDataTest {

  def main(args: Array[String]): Unit = {
    val streamingContext = new StreamingContext("local[2]","wc",Seconds(5))

    //配置消费者信息
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",  //集群地址
      "key.deserializer" -> classOf[StringDeserializer],    //Key-value的反序列化器
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "yuange", //消费者组id
      "auto.offset.reset" -> "latest",  //消费者id(可选)
      "enable.auto.commit" -> "true"  //是否自动提交offset,如果要自动提交,多久自动提交1次
    )

    //指定消费主题
    val topics = Array("topicA")

    // 使用提供的API,从kafka中获取一个DS
    val ds = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      /*
       *    位置策略:  指 Task允许的Executor 和 kafka的broker 的网络拓扑关系
       *      大部分情况, LocationStrategies.PreferConsistent
       *           PreferConsistent : 大部分情况,公司用
       *           PreferBrokers:  executor和broker在同一台机器
       *           PreferFixed: 自定义
       */
      LocationStrategies.PreferConsistent,
      /*
       * 消费策略
       *   独立消费者-->assign:不需要借助Kafka集群保存Offset
       *   非独立消费者(大部分)-->subscribe:需要kafka集群,采取分配策略为消费者组的每个消费者分配分区!需要借助kafka集群保存offset
       */
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    //程序的运算逻辑
    val ds1 = ds.map(record =>{
      //模拟异常
      if (record.value().equals("d"))  1 / 0
      record.value()
    })
    ds1.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

  2)测试

bin/kafka-console-producer.sh --topic topicA --broker-list hadoop102:9092

  3)结果

3.3.3 Direct模式使用Checkpoint维护offset

  1)代码

package com.yuange.kafka

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

/**
 * @作者:袁哥
 * @时间:2021/7/3 22:40
 *   不能自动提交offset,需要自己维护offset!
 *   如何自己维护?
 *        三种方式:①Checkpoints   ②Kafka itself   ③Your own data store
 *    Checkpoints:  checkpoint本质是一个持久化的文件系统!将kafka的偏移量存储在 spark提供的ck目录中,下次程序重启时,会从ck目录获取上次消费的offset,继续消费!
 *    操作: ①设置ck目录:streamingContext.checkpoint("kafkack")
 *           ②设置故障的时候,让Driver从ck目录恢复:
 *                  def getActiveOrCreate(
 *                        checkpointPath: String,       //ck目录
 *                        creatingFunc: () => StreamingContext  // 一个空参的函数,要求返回StreamingContext,函数要求把计算逻辑也放入次函数
 *                  ): StreamingContext
 *           ③取消自动提交offset:"enable.auto.commit" -> "false"
 *    Checkpoints的弊端:
 *          ①一般的异常,会catch住,继续运行,不给你异常后,从异常位置继续往后消费的机会
 *          ②重启后,会从上次ck目录记录的时间戳,一直按照 slide时间,提交Job,到启动的时间
 *          ③会产生大量的小文件
 *    结论:不推荐使用!因为不能保证精确一次!
 */
object KafkaDirectCKTest {

  val ckPath:String ="kafkack"

  def main(args: Array[String]): Unit = {

    def creatingStreamingContextFunc(): StreamingContext = {
      val streamingContext = new StreamingContext("local[2]","wc",Seconds(5))
      //设置ck目录
      streamingContext.checkpoint(ckPath)

      //配置消费者信息
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",  //集群地址
        "key.deserializer" -> classOf[StringDeserializer],    //Key-value的反序列化器
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "yuange", //消费者组id
        "auto.offset.reset" -> "latest",  //消费者id(可选)
        "enable.auto.commit" -> "false"  //是否自动提交offset,如果要自动提交,多久自动提交1次
      )

      //指定消费主题
      val topics = Array("topicA")

      // 使用提供的API,从kafka中获取一个DS
      val ds = KafkaUtils.createDirectStream[String, String](
        streamingContext,
        /*
         *    位置策略:  指 Task允许的Executor 和 kafka的broker 的网络拓扑关系
         *      大部分情况, LocationStrategies.PreferConsistent
         *           PreferConsistent : 大部分情况,公司用
         *           PreferBrokers:  executor和broker在同一台机器
         *           PreferFixed: 自定义
         */
        LocationStrategies.PreferConsistent,
        /*
         * 消费策略
         *   独立消费者-->assign:不需要借助Kafka集群保存Offset
         *   非独立消费者(大部分)-->subscribe:需要kafka集群,采取分配策略为消费者组的每个消费者分配分区!需要借助kafka集群保存offset
         */
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      )

      //程序的运算逻辑
      val ds1 = ds.map(record =>{
        //模拟异常
        if (record.value().equals("d"))  throw new UnknownError("故障了!求你停下来吧!")
        record.value()
      })
      ds1.print()
      streamingContext
    }

    // 要么直接创建,要么从ck目录中恢复一个StreamingContext
    val context: StreamingContext = StreamingContext.getActiveOrCreate(ckPath,creatingStreamingContextFunc)

    context.start()
    context.awaitTermination()
  }
}

  2)测试

bin/kafka-console-producer.sh --topic topicA --broker-list hadoop102:9092

  3)结果

  4)将模拟异常的代码注释掉,然后再次启动,发现重启后,程序会从上次ck目录记录的时间戳,一直按照 slide时间,提交Job,到启动的时间

3.3.4 Direct模式使用Kafka维护offset

  1)代码

package com.yuange.kafka

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @作者:袁哥
 * @时间:2021/7/5 8:19
 *    借助kafka提供的api,手动将offset存储到kafka的__consumer_offsets中
 *        ①取消自动提交offset
 *        ②获取当前消费到的这批数据的offset信息
 *        ③进行计算和输出
 *        ④计算和输出完全完成后,再手动提交offset
 **/
object KafkaDirectStoreOffsetToKafka {

  def main(args: Array[String]): Unit = {
    val streamingContext = new StreamingContext("local[2]","wc",Seconds(5))

    //配置消费者信息
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",  //集群地址
      "key.deserializer" -> classOf[StringDeserializer],    //Key-value的反序列化器
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "yuange", //消费者组id
      "auto.offset.reset" -> "latest",  //消费者id(可选)
      "enable.auto.commit" -> "false"  //是否自动提交offset,如果要自动提交,多久自动提交1次
    )

    //指定消费主题
    val topics = Array("topicA")

    // 使用提供的API,从kafka中获取一个DS
    val ds = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      /*
       *    位置策略:  指 Task允许的Executor 和 kafka的broker 的网络拓扑关系
       *      大部分情况, LocationStrategies.PreferConsistent
       *           PreferConsistent : 大部分情况,公司用
       *           PreferBrokers:  executor和broker在同一台机器
       *           PreferFixed: 自定义
       */
      LocationStrategies.PreferConsistent,
      /*
       * 消费策略
       *   独立消费者-->assign:不需要借助Kafka集群保存Offset
       *   非独立消费者(大部分)-->subscribe:需要kafka集群,采取分配策略为消费者组的每个消费者分配分区!需要借助kafka集群保存offset
       */
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    ds.foreachRDD { rdd =>
      //判断当前RDD是不是KafkaRDD,如果是,获取其中的偏移量信息
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //在Driver端中的foreachRDD,只有RDD的算子,才在Executor端运行
      //计算逻辑:如果有幂等操作,此时是 精确一次,如果没有幂等操作,此时就是 最少一次
      rdd.map(record => {
        //模拟异常
        //if (record.value().equals("d"))  1 / 0
        // 将数据写到redis或hbase,保证写N次,结果不变
        (record.value(),1)
      }).reduceByKey(_ + _).collect().foreach(println(_))

      //手动提交offset
      ds.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    }

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

  2)测试

  3)结果

3.3.5 Direct模式自己维护offset

  1)创建数据库

CREATE TABLE `offsets` (
  `groupid` varchar(100) DEFAULT NULL,
  `topic` varchar(100) DEFAULT NULL,
  `partitionid` int(5) DEFAULT NULL,
  `offset` bigint(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `wordcount` (
  `word` varchar(200) NOT NULL,
  `count` int(11) DEFAULT NULL,
  PRIMARY KEY (`word`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

  2)在idea中新建db.properties

jdbc.datasource.size=10
jdbc.url=jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
jdbc.user=root
jdbc.password=root123
jdbc.driver.name=com.mysql.jdbc.Driver

kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092

  3)新建JDBCUtil 工具类

package com.yuange.utils

import java.sql.Connection

import com.alibaba.druid.pool.DruidDataSourceFactory
import javax.sql.DataSource

/**
 * @作者:袁哥
 * @时间:2021/7/5 9:11
 */
object JDBCUtil {

  // 创建连接池对象
  var dataSource:DataSource = init()

  // 连接池的初始化
  def init():DataSource = {

    val paramMap = new java.util.HashMap[String, String]()
    paramMap.put("driverClassName", PropertiesUtil.getValue("jdbc.driver.name"))
    paramMap.put("url", PropertiesUtil.getValue("jdbc.url"))
    paramMap.put("username", PropertiesUtil.getValue("jdbc.user"))
    paramMap.put("password", PropertiesUtil.getValue("jdbc.password"))
    paramMap.put("maxActive", PropertiesUtil.getValue("jdbc.datasource.size"))

    // 使用Druid连接池对象
    DruidDataSourceFactory.createDataSource(paramMap)
  }

  // 从连接池中获取连接对象
  def getConnection(): Connection = {
    dataSource.getConnection
  }

  def main(args: Array[String]): Unit = {
    println(getConnection())
  }
}

  4)新建PropertiesUtil 工具类

package com.yuange.utils

import java.util.ResourceBundle

/**
 * @作者:袁哥
 * @时间:2021/7/5 9:12
 */
object PropertiesUtil {

  // 绑定配置文件
  // ResourceBundle专门用于读取配置文件,所以读取时,不需要增加扩展名
  // 国际化 = I18N => Properties
  val resourceFile: ResourceBundle = ResourceBundle.getBundle("db")

  def getValue( key : String ): String = {
    resourceFile.getString(key)
  }

  def main(args: Array[String]): Unit = {
    println(getValue("jdbc.user"))
  }
}

  5)代码

package com.yuange.kafka

import java.sql.{Connection, PreparedStatement, ResultSet}

import com.yuange.utils.JDBCUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
 * @作者:袁哥
 * @时间:2021/7/5 8:36
 *    自己维护offset:①取消自动提交offset
 *                    ②在程序开始计算之前,先从 mysql中读取上次提交的offsets
 *                    ③基于上次提交的offsets,构造一个DS,这个DS从上次提交的offsets位置向后消费的数据流
 *                          def SubscribePattern[K, V](
 *                                pattern: ju.regex.Pattern,
 *                                kafkaParams: collection.Map[String, Object],
 *                                offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V]
 *                     ④需要将结果,收集到Driver端,和offsets信息,组合为一个事务,一起写入数据库(若成功,提交,失败,就回滚!)
 *    Mysql中表的设计:
 *        运算的结果:单词统计 -->  result(单词 varchar, count int)
 *        offsets:offsets(group_id varchar, topic varchar, partition int , offset bigint)
 *    精准一次,借助事务
 */
object KafkaDirectStoreOffsetToMysql {

  val groupId:String ="yuange"
  val streamingContext = new StreamingContext("local[2]", "wc", Seconds(5))

  def main(args: Array[String]): Unit = {
    // 消费者的配置
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> "false"
    )

    // 查询之前已经在mysql中保存的offset
    val offsetsMap: Map[TopicPartition, Long] = readHitoryOffsetsFromMysql(groupId)
    //指定消费主题
    val topics = Array("topicA")
    // 基于上次提交的offsets,构造一个DS,这个DS从上次提交的offsets位置向后消费的数据流
    val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsetsMap)
    )
    ds.foreachRDD { rdd =>
      //消费到了数据
      if (!rdd.isEmpty()){
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        offsetRanges.foreach(println)
        //计算逻辑
        val result: Array[(String, Int)] = rdd.map(record => (record.value(), 1)).reduceByKey(_ + _).collect()
        // 开启事务,和offsets一起写入mysql
        writeResultAndOffsetsToMysql(result,offsetRanges)
      }
    }
    streamingContext.start()
    streamingContext.awaitTermination()
  }

  //从Mysql读取历史偏移量
  def readHitoryOffsetsFromMysql(groupId: String) : Map[TopicPartition, Long] = {
    val offsetsMap: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()

    var conn:Connection=null
    var ps:PreparedStatement=null
    var rs:ResultSet=null

    val sql:String=
      """
        |SELECT
        |  `topic`,`partitionid`,`offset`
        |FROM `offsets`
        |WHERE `groupid`=?
        |
        |""".stripMargin
    try {
      conn=JDBCUtil.getConnection()
      ps=conn.prepareStatement(sql)
      ps.setString(1,groupId)
      rs= ps.executeQuery()
      while(rs.next()){
        val topic: String = rs.getString("topic")
        val partitionid: Int = rs.getInt("partitionid")
        val offset: Long = rs.getLong("offset")
        val topicPartition = new TopicPartition(topic, partitionid)
        offsetsMap.put(topicPartition,offset)
      }
    }catch {
      case e:Exception =>
        e.printStackTrace()
        throw new RuntimeException("查询偏移量出错!")

    }finally {
      if (rs != null){
        rs.close()
      }
      if (ps != null){
        ps.close()
      }
      if (conn != null){
        conn.close()
      }
    }
    //将可变map转为不可变map
    offsetsMap.toMap
  }

  //在一个事务中,写入结果和偏移量
  def writeResultAndOffsetsToMysql(result: Array[(String, Int)], offsetRanges: Array[OffsetRange]): Unit = {
    val  sql1:String =
      """
        |INSERT INTO
        |    `wordcount` VALUES(?,?)
        |ON DUPLICATE KEY UPDATE `count`= COUNT + VALUES(COUNT)
        |
        |""".stripMargin

    val sql2:String =
      """
        |INSERT INTO
        |   `offsets` VALUES(?,?,?,?)
        |   ON DUPLICATE KEY UPDATE `offset`= VALUES(OFFSET)
        |
        |""".stripMargin
    var conn:Connection=null
    var ps1:PreparedStatement=null
    var ps2:PreparedStatement=null
    try {
      conn=JDBCUtil.getConnection()
      //取消事务的自动提交 ,只有取消了自动提交,才能将多次写操作组合为一个事务,手动提交
      conn.setAutoCommit(false)
      ps1=conn.prepareStatement(sql1)
      ps2=conn.prepareStatement(sql2)
      for ((word, count) <- result) {
        ps1.setString(1,word)
        ps1.setInt(2,count)
        ps1.addBatch()
      }
      //一批insert执行一次
      ps1.executeBatch()
      //模拟异常
      //1 / 0
      for (offsetRange <- offsetRanges) {
        ps2.setString(1,groupId)
        ps2.setString(2,offsetRange.topic)
        ps2.setInt(3,offsetRange.partition)
        ps2.setLong(4,offsetRange.untilOffset)
        ps2.addBatch()
      }
      ps2.executeBatch()
      //手动提交事务
      conn.commit()
    }catch {
      case e:Exception =>
        e.printStackTrace()
        //回滚事务
        conn.rollback()
        //重启app ,暂时以停止代替
        streamingContext.stop(true)
    }finally {
      if (ps1 != null){
        ps1.close()
      }
      if (ps2 != null){
        ps2.close()
      }
      if (conn != null){
        conn.close()
      }
    }
  }
}

  6)测试

bin/kafka-console-producer.sh --topic topicA --broker-list hadoop102:9092

  7)结果

4 DStream转换

  DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()transform()以及各种Window相关的原语。

4.1 无状态转化操作

  无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了表中。注意,针对键值对的DStream转化操作(比如 reduceByKey())要添加import StreamingContext._才能在Scala中使用。

  需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD批次组成,且无状态转化操作是分别应用到每个RDD上的。

  例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

4.1.1 Transform

  Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStreamAPI中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。

package com.yuange.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @作者:袁哥
 * @时间:2021/7/2 21:34
 */
object Transform {

  def main(args: Array[String]): Unit = {
    //创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //创建StreamingContext
    val ssc = new StreamingContext(sparkConf,Seconds(4))
    //创建DStream
    val ds: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop103",3333)
    //转化为RDD
    ds.transform(rdd => {
      val words: RDD[String] = rdd.flatMap(_.split("	"))
      val wordAndOne: RDD[(String,Int)] = words.map((_ , 1))
      val values: RDD[(String,Int)] = wordAndOne.reduceByKey(_ + _)
      values
    })
    //打印
    ds.print()
    //启动
    ssc.start()
    ssc.awaitTermination()
  }
}

4.1.2 Join

  两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDDjoin效果相同。

package com.yuange.sparkstreaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @作者:袁哥
 * @时间:2021/7/2 21:51
 */
object JoinTest {

  def main(args: Array[String]): Unit = {
    val streamingContext = new StreamingContext("local[*]", "wc", Seconds(5))
    val ds1: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop103", 3333)
    val ds2: ReceiverInputDStream[String] = streamingContext.socketTextStream("hadoop103", 3334)
    //转换为K-V类型
    val ds3: DStream[(String, Int)] = ds1.map((_, 1))
    val ds4: DStream[(String, Int)] = ds2.map((_, 2))
    // 只能Join同一个批次的两个RDD中的数据
    val ds5: DStream[(String, (Int, Int))] = ds3.join(ds4)
    ds5.print(1000)
    // ⑤真正的计算,会在启动了app之后运行
    streamingContext.start()
    // ⑥流式应用,需要一直运行(类似agent) 不能让main运行完,阻塞main
    streamingContext.awaitTermination()
  }
}

4.2 有状态转化操作

4.2.1 UpdateStateByKey

  UpdateStateByKey原语用于记录历史记录,有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。

  updateStateByKey() 的结果会是一个新的DStream,其内部的RDD 序列是由每个时间区间对应的(键,状态)对组成的。

  updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:

    1. 定义状态,状态可以是一个任意的数据类型。

    2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

  使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。

  1) 编写代码

package com.yuange.sparkstreaming

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @作者:袁哥
 * @时间:2021/7/2 22:30
 */
object UpdateStateByKeyTest {

  def main(args: Array[String]): Unit = {
    // 定义更新状态方法,参数values为当前批次单词频度,state为以往批次单词频度
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    val ssc = new StreamingContext("local[*]","UpdateStateByKeyTest",Seconds(5))
    ssc.checkpoint("./ck")

    val word = ssc.socketTextStream("hadoop103",3333)
    // 使用updateStateByKey来更新状态,统计从运行开始以来单词总的次数
    val stateDS = word.flatMap(_.split(" ")).map(r => (r,1)).updateStateByKey[Int](updateFunc)
    stateDS.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

  2) 启动程序并向3333端口发送数据

nc -lk 3333
Hello World
Hello Scala

  3) 结果展示

4.2.2 WindowOperations

  Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长

  • 窗口时长:计算内容的时间范围;
  • 滑动步长:隔多久触发一次计算。

  注意:这两者都必须为采集周期大小的整数倍。

  WordCount第三版:3秒一个批次,窗口12秒,滑步6秒。

package com.yuange.sparkstreaming

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @作者:袁哥
 * @时间:2021/7/2 22:41
 */
object WindowOperations {

  def main(args: Array[String]): Unit = {
    val ssc = new StreamingContext("local[*]","WindowOperations",Seconds(3))
    ssc.checkpoint("./ck")
    val word = ssc.socketTextStream("hadoop103",3333)
    val word2 = word.flatMap(_.split(" ")).map(r => (r,1)).reduceByKeyAndWindow((a: Int,b: Int) => (a+b),Seconds(12),Seconds(6))
    word2.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

  关于Window的操作还有如下方法:

    (1window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的Dstream

    (2countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;

    (3reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;

    (4reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个keyvalue值。

    (5reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
  {(x, y) => x + y},
  {(x, y) => x - y},
  Seconds(30),
  Seconds(10))
  //加上新进入窗口的批次中的元素 //移除离开窗口的老批次中的元素 //窗口时长// 滑动步长

  countByWindow()countByValueAndWindow()作为对数据进行计数操作的简写。countByWindow()返回一个表示每个窗口中元素个数的DStream,而countByValueAndWindow()返回的DStream则包含窗口中每个值的个数。

val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) 
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))

5DStream输出

  输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。

输出操作如下:

  • print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()
  • saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefixsuffix。”prefix-Time_IN_MS[.suffix]”。
  • saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。
  • saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"Python API 中目前不可用。
  • foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。

  通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。

  注意:

    1) 连接不能写在driver层面(序列化)

    2) 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;

    3) 增加foreachPartition,在分区创建(获取)。

6章 优雅关闭

  流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。

  使用外部文件系统来控制内部程序关闭。

  1)MonitorStop

package com.yuange.sparkstreaming


import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}

/**
 * @作者:袁哥
 * @时间:2021/7/3 11:28
 */
class MonitorStop(ssc: StreamingContext) extends Runnable{
  override def run(): Unit = {
    val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:8020"),new Configuration(),"atguigu")
    while (true) {
      try{
        Thread.sleep(5000)
      }catch {
        case e: Exception => {
          e.printStackTrace()
        }
      }
      val state: StreamingContextState = ssc.getState
      val bool: Boolean = fs.exists(new Path("hdfs://hadoop102:8020/stopSpark"))

      if (bool){
        if (state == StreamingContextState.ACTIVE){
          ssc.stop(stopSparkContext = true,stopGracefully = true)
          System.exit(0)
        }
      }
    }
  }
}

  2)SparkTest

package com.yuange.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @作者:袁哥
 * @时间:2021/7/3 11:38
 */
object SparkTest {

  def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
    val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int],status: Option[Int]) => {
      //当前批次的内容
      val sum: Int = values.sum
      //取出状态信息中上一次状态
      val lastStatu: Int = status.getOrElse(0)
      Some(sum + lastStatu)
    }
    val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("SparkTest")
    //设置优雅的关闭
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(sparkConf,Seconds(5))
    ssc.checkpoint("./ck")

    val word: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop103",3333)

    val word2 = word.flatMap(_.split(" ")).map((_,1)).updateStateByKey(update)
    word2.print()
    ssc
  }

  def main(args: Array[String]): Unit = {
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck",() => createSSC())

    new Thread(new MonitorStop(ssc)).start()

    ssc.start()
    ssc.awaitTermination()
  }
}
原文地址:https://www.cnblogs.com/LzMingYueShanPao/p/14961820.html