3Spark学习笔记3

SparkStreaming

SparkStreaming概述

DStream入门

package com.lotuslaw.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */
object Spark_Stream_WordCount {

  def main(args: Array[String]): Unit = {

    // 初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    // 初始化 SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // 通过监控端口创建DStream,读进来的数据为一行行
    // 获取端口数据
    val lines = ssc.socketTextStream("localhost", 9999)

    val words = lines.flatMap(_.split(" "))

    val wordToOne = words.map((_, 1))

    val wordCount = wordToOne.reduceByKey(_ + _)

    wordCount.print()

    // 由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭
    // 如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕
    // ssc.stop()
    // 启动采集器
    ssc.start()

    // 等待采集器的关闭
    ssc.awaitTermination()
  }

}

DStream创建

package com.lotuslaw.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */
object Spark_Stream_Queue {

  def main(args: Array[String]): Unit = {

    // 初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    // 初始化 SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val rddQueue = new mutable.Queue[RDD[Int]]()

    val inputStream = ssc.queueStream(rddQueue, oneAtATime = false)

    val mappedStream = inputStream.map((_, 1))

    val reducedStream = mappedStream.reduceByKey(_ + _)

    reducedStream.print()

    // 启动采集器
    ssc.start()

    for (i <- 1 to 5) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)
    }

    // 等待采集器的关闭
    ssc.awaitTermination()
  }

}

package com.lotuslaw.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable
import scala.util.Random

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */
object Spark_Stream_DIY {

  def main(args: Array[String]): Unit = {

    // 初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    // 初始化 SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val messageDS = ssc.receiverStream(new MyReceiver())

    messageDS.print()

    // 启动采集器
    ssc.start()

    // 等待采集器的关闭
    ssc.awaitTermination()
  }

  /*
    自定义数据采集器
    1. 继承Receiver,定义泛型, 传递参数
    2. 重写方法
     */
  class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {

    private var flg = true

    override def onStart(): Unit = {
      new Thread(new Runnable {
        override def run(): Unit = {
          while (flg) {
            val message = "采集的数据为" + new Random().nextInt(10).toString
            store(message)
            Thread.sleep(500)
        }
        }
      }).start()
    }

    override def onStop(): Unit = {
      flg = false
    }
  }
}

package com.lotuslaw.spark.streaming

import java.util.Random

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */
object Spark_Stream_KafKa {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "lotuslaw",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("lotuslawNew"), kafkaPara)
    )
    kafkaDataDS.map(_.value()).print()


    ssc.start()
    ssc.awaitTermination()
  }

}

DStream转换

package com.lotuslaw.spark.streaming

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */
object Spark_Stream_State {


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    import StreamingContext._
    ssc.checkpoint("cp")

    // 无状态数据操作,只对当前的采集周期内的数据进行处理
    // 在某些场合下,需要保留数据统计结果(状态),实现数据的汇总
    // 使用有状态操作时,需要设定检查点路径
    val datas = ssc.socketTextStream("localhost", 9999)

    val wordToOne = datas.map((_, 1))

    // updateStateByKey:根据key对数据的状态进行更新
    // 传递的参数中含有两个值
    // 第一个值表示相同的key的value数据
    // 第二个值表示缓存区相同key的value数据
    val state = wordToOne.updateStateByKey(
      (seq, buff: Option[Int]) => {
        val newCount = buff.getOrElse(0) + seq.sum
        Option(newCount)
      }
    )

    state.print()

    ssc.start()
    ssc.awaitTermination()
  }

}

package com.lotuslaw.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */
object Spark_Stream_Transform {


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val lines = ssc.socketTextStream("localhost", 9999)

    // transform方法可以将底层RDD获取到后进行操作
    // 1. DStream功能不完善
    // 2. 需要代码周期性的执行

    val newDS = lines.transform(
      rdd => {
        // Code:Driver端(周期性执行)
        rdd.map(
          str => {
            // Code : Executor端
            str
          }
        )
      }
    )

    // Code: Driver端
    val newDS1 = lines.map(
      data => {
        // Code: Executor端
        data
      }
    )

    newDS.print()

    ssc.start()
    ssc.awaitTermination()
  }

}

package com.lotuslaw.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */
object Spark_Stream_State_Join {


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val data9999 = ssc.socketTextStream("localhost", 9999)
    val data8888 = ssc.socketTextStream("localhost", 8888)

    val map9999 = data9999.map((_, 9))
    val map8888 = data8888.map((_, 8))

    // 所谓的DStream的Join操作,其实就是两个RDD的join
    val joinDS = map9999.join(map8888)

    joinDS.print()

    ssc.start()
    ssc.awaitTermination()
  }

}

package com.lotuslaw.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */
object Spark_Stream_Window {


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_, 1))

    // 窗口的范围应该是采集周期的整数倍
    // 窗口可以滑动的,但是默认情况下,一个采集周期进行滑动
    // 这样的话,可能会出现重复数据的计算,为了避免这种情况,可以改变滑动的步长
    val windowDS = wordToOne.window(Seconds(6), Seconds(6))

    val wordCount = windowDS.reduceByKey(_ + _)

    wordCount.print()

    ssc.start()
    ssc.awaitTermination()
  }

}
package com.lotuslaw.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */
object Spark_Stream_Window1 {


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")
    import StreamingContext._

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_, 1))

    // reduceByKeyAndWindow:当窗口范围比较大,但是滑动幅度比较小,那么可以采用增加数据和删除数据的方式
    // 无需重复计算,提升性能
    val windowDS = wordToOne.reduceByKeyAndWindow(
      (x, y) => {
        x + y
      },
      (x, y) => {
        x - y
      },
      Seconds(9), Seconds(3)
    )

    windowDS.print()

    ssc.start()
    ssc.awaitTermination()
  }

}

DStream输出

package com.lotuslaw.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */
object Spark_Stream_Output {


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_, 1))

    val windowDS = wordToOne.reduceByKeyAndWindow(
      (x, y) => {
        x + y
      },
      (x, y) => {
        x - y
      },
      Seconds(9), Seconds(3)
    )

    // SparkStreaming如何没有输出操作,那么会提示错误
//    windowDS.print()

    ssc.start()
    ssc.awaitTermination()
  }

}
package com.lotuslaw.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */
object Spark_Stream_Output1 {


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_, 1))

    val windowDS = wordToOne.reduceByKeyAndWindow(
      (x, y) => {
        x + y
      },
      (x, y) => {
        x - y
      },
      Seconds(9), Seconds(3)
    )

    // SparkStreaming如何没有输出操作,那么会提示错误
//    windowDS.print()
    // foreachRDD不会出现时间戳
    windowDS.foreachRDD(
      rdd => {

      }
    )

    ssc.start()
    ssc.awaitTermination()
  }

}

优雅关闭

package com.lotuslaw.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */
object Spark_Stream_Close {


  def main(args: Array[String]): Unit = {

    /*
           线程的关闭:
           val thread = new Thread()
           thread.start()

           thread.stop(); // 强制关闭

         */

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_, 1))

    wordToOne.print()



    ssc.start()

    // 如果想要关闭采集器,那么需要创建新的线程
    // 而且需要在第三方程序中增加关闭状态
    new Thread(
      new Runnable {
        override def run(): Unit = {
          // 优雅地关闭
          // 计算节点不再接收新的数据,而是将现有的数据处理完毕,然后关闭
          // Mysql : Table(stopSpark) => Row => data
          // Redis : Data(K-V)
          // ZK    : /stopSpark
          // HDFS  : /stopSpark
          /*
                    while ( true ) {
                        if (true) {
                            // 获取SparkStreaming状态
                            val state: StreamingContextState = ssc.getState()
                            if ( state == StreamingContextState.ACTIVE ) {
                                ssc.stop(true, true)
                            }
                        }
                        Thread.sleep(5000)
                    }
                     */
          Thread.sleep(5000)
          val state = ssc.getState()
          if (state == StreamingContextState.ACTIVE) {
            ssc.stop(true, true)
          }
          System.exit(0)
        }
      }
    ).start()


    ssc.awaitTermination()
  }

}
package com.lotuslaw.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */

// 恢复
object Spark_Stream_Resume {


  def main(args: Array[String]): Unit = {

    val ssc = StreamingContext.getActiveOrCreate("cp", () => {
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
      val ssc = new StreamingContext(sparkConf, Seconds(3))

      val lines = ssc.socketTextStream("localhost", 9999)
      val wordToOne = lines.map((_, 1))
      wordToOne.print()
      ssc
    })

    ssc.checkpoint("cp")

    ssc.start()
    ssc.awaitTermination()

  }

}

SparkStreaming案例

package com.lotuslaw.spark.util

import java.sql.{Connection, PreparedStatement}
import java.util.Properties

import com.alibaba.druid.pool.DruidDataSourceFactory
import javax.sql.DataSource

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.util
 * @create: 2021-12-03 12:37
 * @description:
 */
object JDBCUtil {

  // 初始化连接池
  var dataSource: DataSource = init()

  // 初始化连接池方法
  def init(): DataSource = {
    val properties = new Properties()
    properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
    properties.setProperty("url", "jdbc:mysql://hadoop102:3306/test?userUnicode=true&characterEncoding=UTF-8")
    properties.setProperty("username", "root")
    properties.setProperty("password", "*******")
    properties.setProperty("maxActive", "50")
    DruidDataSourceFactory.createDataSource(properties)
  }

  // 获取MySQL连接
  def getConnection: Connection = {
    dataSource.getConnection
  }

  // 执行SQL语句,单跳数据插入
  def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int = {
    var rtn = 0
    var pstmt: PreparedStatement = null
    try {
      connection.setAutoCommit(false)
      pstmt = connection.prepareStatement(sql)
      if (params != null && params.length > 0) {
        for (i <- params.indices) {
          pstmt.setObject(i + 1, params(i))
        }
      }
      rtn = pstmt.executeUpdate()
      connection.commit()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    rtn
  }

  def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean = {
    var flag: Boolean = false
    var pstmt: PreparedStatement = null
    try {
      pstmt = connection.prepareStatement(sql)
      for (i <- params.indices) {
        pstmt.setObject(i+1, params(i))
      }
      flag = pstmt.executeQuery().next()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    flag
  }

}
package com.lotuslaw.spark.streaming

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.util.Random

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */

// 恢复
object Spark_Stream_Req_MockData {


  def main(args: Array[String]): Unit = {

    // 生成模拟数据
    // 格式 :timestamp area city userid adid
    // 含义: 时间戳   区域  城市 用户 广告
    // Application => Kafka => SparkStreaming => Analysis

    // 添加配置
    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092")
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](prop)

    while (true) {
      mockData().foreach(
        data => {
          // 向Kafka中生成数据
          val record = new ProducerRecord[String, String]("lotuslawNew", data)
          producer.send(record)
          println(data)
        }
      )
      Thread.sleep(2000)
    }


  }

  def mockData() = {
    val list = ListBuffer[String]()
    val areaList = ListBuffer[String]("华北", "华东", "华南")
    val cityList = ListBuffer[String]("北京", "上海", "深圳")

    for (elem <- 1 to new Random().nextInt(50)) {
      val area = areaList(new Random().nextInt(3))
      val city = cityList(new Random().nextInt(3))
      val userid = new Random().nextInt(6) + 1
      val adid = new Random().nextInt(6) + 1

      list.append(s"${System.currentTimeMillis()} ${area} ${city} ${userid} ${adid}")
    }
    list
  }

}
package com.lotuslaw.spark.streaming

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.{SPARK_BRANCH, SparkConf}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.util.Random

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */

// 恢复
object Spark_Stream_Req1 {


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val kafkaPara = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "lotuslaw",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("lotuslawNew"), kafkaPara)
    )

    kafkaDataDS.map(_.value()).print()


    ssc.start()
    ssc.awaitTermination()
  }

}
package com.lotuslaw.spark.streaming

import com.lotuslaw.spark.util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.sql.ResultSet
import java.text.SimpleDateFormat
import scala.collection.mutable.ListBuffer

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */

// 恢复
object Spark_Stream_Req1_BlackList {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "lotuslaw",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("lotuslawNew"), kafkaPara)
    )
    val adClickData = kafkaDataDS.map(
      kafkaData => {
        val data = kafkaData.value()
        val datas = data.split(" ")
        AdClickData(datas(0),datas(1),datas(2),datas(3),datas(4))
      }
    )

    val ds = adClickData.transform(
      rdd => {
        // TODO 通过JDBC周期性获取黑名单数据
        val blackList = ListBuffer[String]()

        val conn = JDBCUtil.getConnection
        val pstat = conn.prepareStatement("select userid from black_list")

        val rs: ResultSet = pstat.executeQuery()
        while ( rs.next() ) {
          blackList.append(rs.getString(1))
        }

        rs.close()
        pstat.close()
        conn.close()

        // TODO 判断点击用户是否在黑名单中
        val filterRDD = rdd.filter(
          data => {
            !blackList.contains(data.user)
          }
        )

        // TODO 如果用户不在黑名单中,那么进行统计数量(每个采集周期)
        filterRDD.map(
          data => {
            val sdf = new SimpleDateFormat("yyyy-MM-dd")
            val day = sdf.format(new java.util.Date( data.ts.toLong ))
            val user = data.user
            val ad = data.ad

            (( day, user, ad ), 1) // (word, count)
          }
        ).reduceByKey(_+_)
      }
    )

    ds.foreachRDD(
      rdd => {
        rdd.foreach{
          case ( ( day, user, ad ), count ) => {
            println(s"${day} ${user} ${ad} ${count}")
            if ( count >= 30 ) {
              // TODO 如果统计数量超过点击阈值(30),那么将用户拉入到黑名单
              val conn = JDBCUtil.getConnection
              val pstat = conn.prepareStatement(
                """
                  |insert into black_list (userid) values (?)
                  |on DUPLICATE KEY
                  |UPDATE userid = ?
                                """.stripMargin)
              pstat.setString(1, user)
              pstat.setString(2, user)
              pstat.executeUpdate()
              pstat.close()
              conn.close()
            } else {
              // TODO 如果没有超过阈值,那么需要将当天的广告点击数量进行更新。
              val conn = JDBCUtil.getConnection
              val pstat = conn.prepareStatement(
                """
                  | select
                  |     *
                  | from user_ad_count
                  | where dt = ? and userid = ? and adid = ?
                                """.stripMargin)

              pstat.setString(1, day)
              pstat.setString(2, user)
              pstat.setString(3, ad)
              val rs = pstat.executeQuery()
              // 查询统计表数据
              if ( rs.next() ) {
                // 如果存在数据,那么更新
                val pstat1 = conn.prepareStatement(
                  """
                    | update user_ad_count
                    | set count = count + ?
                    | where dt = ? and userid = ? and adid = ?
                                    """.stripMargin)
                pstat1.setInt(1, count)
                pstat1.setString(2, day)
                pstat1.setString(3, user)
                pstat1.setString(4, ad)
                pstat1.executeUpdate()
                pstat1.close()
                // TODO 判断更新后的点击数据是否超过阈值,如果超过,那么将用户拉入到黑名单。
                val pstat2 = conn.prepareStatement(
                  """
                    |select
                    |    *
                    |from user_ad_count
                    |where dt = ? and userid = ? and adid = ? and count >= 30
                                    """.stripMargin)
                pstat2.setString(1, day)
                pstat2.setString(2, user)
                pstat2.setString(3, ad)
                val rs2 = pstat2.executeQuery()
                if ( rs2.next() ) {
                  val pstat3 = conn.prepareStatement(
                    """
                      |insert into black_list (userid) values (?)
                      |on DUPLICATE KEY
                      |UPDATE userid = ?
                                        """.stripMargin)
                  pstat3.setString(1, user)
                  pstat3.setString(2, user)
                  pstat3.executeUpdate()
                  pstat3.close()
                }

                rs2.close()
                pstat2.close()
              } else {
                // 如果不存在数据,那么新增
                val pstat1 = conn.prepareStatement(
                  """
                    | insert into user_ad_count ( dt, userid, adid, count ) values ( ?, ?, ?, ? )
                                    """.stripMargin)

                pstat1.setString(1, day)
                pstat1.setString(2, user)
                pstat1.setString(3, ad)
                pstat1.setInt(4, count)
                pstat1.executeUpdate()
                pstat1.close()
              }

              rs.close()
              pstat.close()
              conn.close()
            }
          }
        }
      }
    )

    ssc.start()
    ssc.awaitTermination()
  }

  // 广告点击数据
  case class AdClickData(ts: String, area: String, city: String, user: String, ad: String)
}
package com.lotuslaw.spark.streaming

import com.lotuslaw.spark.util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.ResultSet
import java.text.SimpleDateFormat
import scala.collection.mutable.ListBuffer

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */

// 恢复
object Spark_Stream_Req1_BlackList1 {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "lotuslaw",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("lotuslawNew"), kafkaPara)
    )
    val adClickData = kafkaDataDS.map(
      kafkaData => {
        val data = kafkaData.value()
        val datas = data.split(" ")
        AdClickData(datas(0),datas(1),datas(2),datas(3),datas(4))
      }
    )

    val ds = adClickData.transform(
      rdd => {
        // TODO 通过JDBC周期性获取黑名单数据
        val blackList = ListBuffer[String]()

        val conn = JDBCUtil.getConnection
        val pstat = conn.prepareStatement("select userid from black_list")

        val rs: ResultSet = pstat.executeQuery()
        while ( rs.next() ) {
          blackList.append(rs.getString(1))
        }

        rs.close()
        pstat.close()
        conn.close()

        // TODO 判断点击用户是否在黑名单中
        val filterRDD = rdd.filter(
          data => {
            !blackList.contains(data.user)
          }
        )

        // TODO 如果用户不在黑名单中,那么进行统计数量(每个采集周期)
        filterRDD.map(
          data => {
            val sdf = new SimpleDateFormat("yyyy-MM-dd")
            val day = sdf.format(new java.util.Date( data.ts.toLong ))
            val user = data.user
            val ad = data.ad

            (( day, user, ad ), 1) // (word, count)
          }
        ).reduceByKey(_+_)
      }
    )

    ds.foreachRDD(
      rdd => {
        // rdd. foreach方法会每一条数据创建连接
        // foreach方法是RDD的算子,算子之外的代码是在Driver端执行,算子内的代码是在Executor端执行
        // 这样就会涉及闭包操作,Driver端的数据就需要传递到Executor端,需要将数据进行序列化
        // 数据库的连接对象是不能序列化的。

        // RDD提供了一个算子可以有效提升效率 : foreachPartition
        // 可以一个分区创建一个连接对象,这样可以大幅度减少连接对象的数量,提升效率
        rdd.foreachPartition(iter => {
          val conn = JDBCUtil.getConnection
          iter.foreach{
            case ( ( day, user, ad ), count ) => {

            }
          }
          conn.close()
        }
        )

        rdd.foreach{
          case ( ( day, user, ad ), count ) => {
            println(s"${day} ${user} ${ad} ${count}")
            if ( count >= 30 ) {
              // TODO 如果统计数量超过点击阈值(30),那么将用户拉入到黑名单
              val conn = JDBCUtil.getConnection
              val sql = """
                          |insert into black_list (userid) values (?)
                          |on DUPLICATE KEY
                          |UPDATE userid = ?
                                      """.stripMargin
              JDBCUtil.executeUpdate(conn, sql, Array( user, user ))
              conn.close()
            } else {
              // TODO 如果没有超过阈值,那么需要将当天的广告点击数量进行更新。
              val conn = JDBCUtil.getConnection
              val sql = """
                          | select
                          |     *
                          | from user_ad_count
                          | where dt = ? and userid = ? and adid = ?
                                      """.stripMargin
              val flg = JDBCUtil.isExist(conn, sql, Array( day, user, ad ))

              // 查询统计表数据
              if ( flg ) {
                // 如果存在数据,那么更新
                val sql1 = """
                             | update user_ad_count
                             | set count = count + ?
                             | where dt = ? and userid = ? and adid = ?
                                           """.stripMargin
                JDBCUtil.executeUpdate(conn, sql1, Array(count, day, user, ad))
                // TODO 判断更新后的点击数据是否超过阈值,如果超过,那么将用户拉入到黑名单。
                val sql2 = """
                             |select
                             |    *
                             |from user_ad_count
                             |where dt = ? and userid = ? and adid = ? and count >= 30
                                           """.stripMargin
                val flg1 = JDBCUtil.isExist(conn, sql2, Array( day, user, ad ))
                if ( flg1 ) {
                  val sql3 = """
                               |insert into black_list (userid) values (?)
                               |on DUPLICATE KEY
                               |UPDATE userid = ?
                                              """.stripMargin
                  JDBCUtil.executeUpdate(conn, sql3, Array( user, user ))
                }
              } else {
                val sql4 = """
                             | insert into user_ad_count ( dt, userid, adid, count ) values ( ?, ?, ?, ? )
                                           """.stripMargin
                JDBCUtil.executeUpdate(conn, sql4, Array( day, user, ad, count ))
              }
              conn.close()
            }
          }
        }
      }
    )

    ssc.start()
    ssc.awaitTermination()
  }

  // 广告点击数据
  case class AdClickData(ts: String, area: String, city: String, user: String, ad: String)
}
package com.lotuslaw.spark.streaming

import com.lotuslaw.spark.util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.ResultSet
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBuffer

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */

// 恢复
object Spark_Stream_Req2 {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "lotuslaw",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("lotuslawNew"), kafkaPara)
    )
    val adClickData = kafkaDataDS.map(
      kafkaData => {
        val data = kafkaData.value()
        val datas = data.split(" ")
        AdClickData(datas(0),datas(1),datas(2),datas(3),datas(4))
      }
    )

    val reduceDS = adClickData.map(
      data => {
        val sdf = new SimpleDateFormat("yyyy-MM-dd")
        val day = sdf.format(new Date(data.ts.toLong))
        val area = data.area
        val city = data.city
        val ad = data.ad
        ((day, area, city, ad), 1)
      }
    ).reduceByKey(_ + _)

    reduceDS.foreachRDD(
      rdd => {
        rdd.foreachPartition(
          iter => {
            val conn = JDBCUtil.getConnection
            val pstat = conn.prepareStatement(
              """
                | insert into area_city_ad_count ( dt, area, city, adid, count )
                | values ( ?, ?, ?, ?, ? )
                | on DUPLICATE KEY
                | UPDATE count = count + ?
                |""".stripMargin)
            iter.foreach{
              case ((day, area, city, ad), sum) => {
                pstat.setString(1, day)
                pstat.setString(2, area)
                pstat.setString(3, city)
                pstat.setString(4, ad)
                pstat.setInt(5, sum)
                pstat.setInt(6, sum)
                pstat.executeUpdate()
              }
            }
            pstat.close()
            conn.close()
          }
        )
      }
    )

    ssc.start()
    ssc.awaitTermination()
  }

  // 广告点击数据
  case class AdClickData(ts: String, area: String, city: String, user: String, ad: String)
}
package com.lotuslaw.spark.streaming

import com.lotuslaw.spark.util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.text.SimpleDateFormat
import java.util.Date

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */

// 恢复
object Spark_Stream_Req3 {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "lotuslaw",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("lotuslawNew"), kafkaPara)
    )
    val adClickData = kafkaDataDS.map(
      kafkaData => {
        val data = kafkaData.value()
        val datas = data.split(" ")
        AdClickData(datas(0),datas(1),datas(2),datas(3),datas(4))
      }
    )

    // 最近一分钟,每10秒计算一次
    // 12:01 => 12:00
    // 12:11 => 12:10
    // 12:19 => 12:10
    // 12:25 => 12:20
    // 12:59 => 12:50

    // 55 => 50, 49 => 40, 32 => 30
    // 55 / 10 * 10 => 50
    // 49 / 10 * 10 => 40
    // 32 / 10 * 10 => 30
    val reduceDS = adClickData.map(
      data => {
        val ts = data.ts.toLong
        val newTs = ts / 10000 * 10000
        (newTs, 1)
      }
    ).reduceByKeyAndWindow(
      (x: Int, y: Int) => {x + y},
      Seconds(60),
      Seconds(10)
    )

    reduceDS.print()

    ssc.start()
    ssc.awaitTermination()
  }

  // 广告点击数据
  case class AdClickData(ts: String, area: String, city: String, user: String, ad: String)
}
package com.lotuslaw.spark.streaming

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.io.{File, FileWriter, PrintWriter}
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBuffer

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.spark.streaming
 * @create: 2021-12-03 9:54
 * @description:
 */

// 恢复
object Spark_Stream_Req3_New {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "lotuslaw",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("lotuslawNew"), kafkaPara)
    )
    val adClickData = kafkaDataDS.map(
      kafkaData => {
        val data = kafkaData.value()
        val datas = data.split(" ")
        AdClickData(datas(0),datas(1),datas(2),datas(3),datas(4))
      }
    )

    // 最近一分钟,每10秒计算一次
    // 12:01 => 12:00
    // 12:11 => 12:10
    // 12:19 => 12:10
    // 12:25 => 12:20
    // 12:59 => 12:50

    // 55 => 50, 49 => 40, 32 => 30
    // 55 / 10 * 10 => 50
    // 49 / 10 * 10 => 40
    // 32 / 10 * 10 => 30
    val reduceDS = adClickData.map(
      data => {
        val ts = data.ts.toLong
        val newTs = ts / 10000 * 10000
        (newTs, 1)
      }
    ).reduceByKeyAndWindow(
      (x: Int, y: Int) => {x + y},
      Seconds(60),
      Seconds(10)
    )

    reduceDS.foreachRDD(
      rdd => {
        val list = ListBuffer[String]()
        val datas = rdd.sortByKey(true).collect()
        datas.foreach{
          case (time, cnt) => {
            val timeString = new SimpleDateFormat("mm:ss").format(new Date(time.toLong))
            list.append(s"""{"xtime": "${timeString}", "yval": "${cnt}"}""")
          }
        }

        // 输出文件
        val out = new PrintWriter(new FileWriter((new File("output/adclick.json"))))
        out.println("[" + list.mkString(",") + "]")
        out.flush()
        out.close()
      }
    )

    ssc.start()
    ssc.awaitTermination()
  }

  // 广告点击数据
  case class AdClickData(ts: String, area: String, city: String, user: String, ad: String)
}
原文地址:https://www.cnblogs.com/lotuslaw/p/15640155.html