spark-Streaming

监听文件-定时文件监听
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf

// local[3] 至少开启2个; 一个用于监听文件,一个用于处理数据
val sparkConf = new SparkConf().setAppName("fileStream").setMaster("local[3]")
// 间隔20秒查看一次
val ssc = new StreamingContext(sparkConf, Seconds(10))
// 设置监听的文件夹
val lines = ssc.textFileStream("D:\sparkStreamLog")
// 处理并打印监听到的内容
lines.print()
// 开启 spark stream
ssc.start()
//阻塞等待计算
ssc.awaitTermination()
-------------------------------------------
Time: 1565595490000 ms
-------------------------------------------
此处打印监听的内容...
-------------------------------------------
Time: 1565595500000 ms
-------------------------------------------
监听文件-结构化数据流

数据

{"name":"json","age":23,"hobby":"running"}
{"name":"charles","age":32,"hobby":"basketball"}
{"name":"tom","age":28,"hobby":"football"}
{"name":"lili","age":24,"hobby":"running"}
{"name":"bob","age":20,"hobby":"swimming"}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.streaming.{Seconds, StreamingContext}

val spark = SparkSession.builder()
.appName("dStream_1")
.master("local[*]")
.getOrCreate()

// 导入rdd的隐式转换
import spark.implicits._

// 因为输入的数据是结构化数据,因此在创建DataFrame时,需要先定义好schema,这样spark程序才知道如何解析json数据;
val userSchema = new StructType()
.add("name","string") // 定义name字段
.add("age","integer")
.add("hobby","string")


// 创建DataFrame
val userDF = spark.readStream
.schema(userSchema)  // 设置字段解析
.json("D:/JsonFile")  // 读取数据文件

val userlittler25DF = userDF.filter($"age"<25)  // 筛选age小于25的数据

val hobbyDF = userlittler25DF.groupBy("hobby").count()  // 对hobby字段进行分组求和

val query = hobbyDF.writeStream
.outputMode("complete")  // 设置输出模式为complete
.format("console") // 输出到控制台
.start() // 开始执行

query.awaitTermination()  // 等待执行结果

-------------------------------------------
Batch: 1
-------------------------------------------
+--------+-----+
|   hobby|count|
+--------+-----+
| running|    2|
|swimming|    1|
+--------+-----+
监听端口套接字

随机读取文件数据发送

table.txt 数据

scala java
java C++
C C++ PHP
C++ PHP
python C++
PHP java

数据发送服务程序

import java.io.PrintWriter
import java.net.ServerSocket
import scala.io.Source

def main(args: Array[String]): Unit = {
    val file = "D:\sparkStreamLog\change\table.txt"
    //读取文件的每一行数据到list里面
    val lines = Source.fromFile(file).getLines().toList
    //获得数据行数
    val rowCount = lines.length
    //设置客户端的端口
    val listen = new ServerSocket(6666)

    while (true) {
        // 等待端口被连接
        val socket = listen.accept()
        // 创建一个端口连接后的处理线程
        val thread = new Thread() {
            //重写run方法,线程启动后自动调用run方法
            override def run = {
                // 打印客户端的 IP地址
                println("客户端地址为:" + socket.getInetAddress)
                // 获取的客户端的输出流(可以向服务器发送(写)数据)
                val send = new PrintWriter(socket.getOutputStream, true)
                while (true) {
                    //每隔3秒发送一次数据
                    Thread.sleep(3000)
                    // 随机获取 list里的一条数据
                    val content = lines(index(rowCount))
                    println("******")
                    println(content)
                    // 向服务器发送一条数据
                    send.write(content + "
")
                    // 刷新写入
                    send.flush()
                }
                // 当连接断开时,socket也断开连接
                socket.close()
            }
        }
        //启动处理线程
        thread.start()
    }
}

// 生成一个 0到length的随机数
def index(length: Int): Int = {
    val rd = new java.util.Random()
    rd.nextInt(length) //随机获取0-length 之间的一个数
}

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}


val Conf = new SparkConf().setAppName("套接字流").setMaster("local[2]")
val ss = new StreamingContext(Conf, Seconds(6))
// 监听客户端正在使用的 6666 端口,接收发送的信息
val lines = ss.socketTextStream("localhost", 6666)
// 处理和打印 发送过来的数据 进行WordCount处理
lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey((x, y) => x + y).print()
//启动 spark stream
ss.start()
// 等待接收和处理数据
ss.awaitTermination()
-------------------------------------------
Time: 1565598120000 ms
-------------------------------------------
(PHP,1)
(java,2)
(C++,1)
-------------------------------------------
Time: 1565598126000 ms
-------------------------------------------
(scala,2)
(java,2)

监听端口数据-统计包括历史数据

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")

val sc = new StreamingContext(conf, Seconds(5))

// 设置检查点,检查点具有容错机制; 用于存放之前处理好的数据的文件夹
sc.checkpoint("D:\sparkStreamLog\change")

// 从6666端口读取发送过来的数据
val lines = sc.socketTextStream("localhost", 6666)

//定义状态更新函数,历史数据和新数据的处理方式
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
    //      values: 新数据key的values   state: 历史数据key的values
    // 新数据的value求和
    val currentCount = values.foldLeft(0)(_ + _) //初始值为 0 防止遍历到其他key的value为空时相加出现异常
    // 获取历史数据key对应的值,没有则返回 0
    val previousCount = state.getOrElse(0)
    println("#################", values.toBuffer, state.toBuffer, "	 result: " + (currentCount + previousCount))
    //返回 新数据value+历史数据value
    Some(currentCount + previousCount)
}

// 处理数据
val wordDstream = lines.flatMap(_.split(" "))
.map(x => (x, 1))
.updateStateByKey[Int](updateFunc) // 更新数据, 将之前的结果和现在处理的结果合并统计并输出

//    保存数据到MySQL
wordDstream.foreachRDD(rdd => {
    // 创建用于将每条数据插入MySQL数据库的方法, 接受一个装数据的迭代器
    def funChange(words: Iterator[(String, Int)]): Unit = {
        // 创建MySQL连接
        var conn: Connection = null
        // 用于执行SQL语句
        var stat: PreparedStatement = null

        try {
            val url = "jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC"
            val user = "root"
            val password = "123456"
            // 连接MySQL
            conn = DriverManager.getConnection(url, user, password)
            // 遍历每一条数据
            words.foreach(word => {

                val sql = "insert into fromsparkdata(word,counts) values (?,?)"
                // 包装SQL语句(反SQL语句注入)
                stat = conn.prepareStatement(sql)
                // 向第一个 ? 放入数据
                stat.setString(1, word._1.trim)
                // 第二个 ? 放入数据
                stat.setInt(2, word._2.toInt)
                // 提交执行 更新数据的操作
                stat.executeUpdate()
            })
        } catch {
            case e: Exception => e.printStackTrace()
        } finally {
            // 关闭操作
            if (stat != null) {
                stat.close()
            }
            // 关闭连接
            if (conn != null) {
                conn.close()
            }
        }
    }

    // 重新划分为3个分区
    val reparRdd = rdd.repartition(3)

    // 遍历每一个分区的数据迭代
    reparRdd.foreachPartition(funChange)
})

//    保存数据到文件夹
//    wordDstream.saveAsTextFiles("D:\sparkStreamLog\change\data\")

// 开启 spark stream 开始监听
sc.start()
sc.awaitTermination()
                   新数据统计的个数  历史数据的个数       返回的个数
(#################,ArrayBuffer(1),ArrayBuffer(),	 result: 1)
(#################,ArrayBuffer(1),ArrayBuffer(),	 result: 1)

(#################,ArrayBuffer(1, 1),ArrayBuffer(),	 result: 2)
(#################,ArrayBuffer(1),ArrayBuffer(),	 result: 1)

(#################,ArrayBuffer(1),ArrayBuffer(1),	 result: 2)
(#################,ArrayBuffer(),ArrayBuffer(1),	 result: 1)
监听端口结构化数据
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}


val spark = SparkSession.builder()
.appName("dStream_1")
.master("local[*]")
.getOrCreate()

import spark.implicits._

val lines = spark.readStream
.format("socket") // 读取的数据流类型
.option("host", "localhost") // IP地址
.option("port", 6666) // 端口
.load() // 监听数据 返回DataFrame类型

// lines.as[String]将DataFrame转换成DataSet,其实DataFrame只是DataSet的特例 type DataFrame = Dataset[Row]
val words = lines.as[String]

.flatMap(_.split(" "))

val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
.outputMode("complete") // outputMode设置了’complete’模式,即每次都输出全部结果数据
.format("console") // format定义输出媒介,这里为控制台
.start() // 开始 查询

query.awaitTermination() // 等待查询结果
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  C++|    1|
|    C|    1|
|  PHP|    1|
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|   C++|    5|
|     C|    2|
| scala|    1|
|   PHP|    5|
|  java|    2|
|python|    1|
+------+-----+
监听RDD队列数据
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}


val sparkConf = new SparkConf().setAppName("RDDQueue").setMaster("local[2]")
// 10 秒检查一次队列是否有新数据(以添加一次的数据就处理一次,时间只是处理的间隔时间)
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 创建可变的rdd队列
val rddQueue = new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
// 监听rdd队列
val queueStream = ssc.queueStream(rddQueue)
//处理数据并打印
queueStream.map(r => (r % 10, 1)).reduceByKey(_ + _).print()
//启动 spark streaming 开始监听
ssc.start()
//每隔3秒添加数据到队列里面
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 100, 2)
Thread.sleep(3000)
}

// 等待数据处理完再关闭
Thread.sleep(30000)
ssc.stop()
-------------------------------------------
Time: 1565598778000 ms
-------------------------------------------
(4,10)
(0,10)
(6,10)
(8,10)
(2,10)
(1,10)
(3,10)
(7,10)
(9,10)
(5,10)
原文地址:https://www.cnblogs.com/studyNotesSL/p/11341313.html