StructuredStreaming简单的例子(NewAPI)

StructuredStreaming简单的例子(NewAPI)(wordCount)

package com.briup.streaming.structed

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

object SocketSourceMyTest {
  def main(args: Array[String]): Unit = {
    //设置Logger日志级别
    Logger.getLogger("org").setLevel(Level.WARN)

    //1 类似SparkSql构建过程,需要SparkSession对象
    val spark = SparkSession.builder().master("local[*]").appName("SocketSourceMyTest").getOrCreate()
    import spark.implicits._

    //2 从某个数据源获取数据
    val df = spark.readStream.format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .option("includeTimestamp", true)
      .load()

    //3 数据处理
    //    必须  df  ----> df.writeStream.start()
    //
    val w_c = df.flatMap(row =>
      row.getAs[String]("value").split(" ")
        .map(word => (word,1))
    )
    val res1 = w_c.toDF("word","number").groupBy("word").sum("number")

    //4 声明开始执行任务(开启任务)
    val query1 = res1.writeStream
        .outputMode(OutputMode.Complete())
      .format("console")
      .start()
    query1.awaitTermination()
    spark.close()

  }
}
原文地址:https://www.cnblogs.com/Diyo/p/11395051.html