Structured Streaming demo


	import org.apache.spark.sql.SparkSession

	import org.apache.spark.sql.functions._

	import org.apache.spark.sql.types.TimestampType

	import org.apache.spark.sql.streaming.Trigger

	import java.sql.Timestamp

	

	object StockCCICompute {

	  

	  def main(args: Array[String]): Unit = {

	    

	    val spark = SparkSession

	      .builder

	      .appName("StockCCICompute")

	      .getOrCreate()

	    

	    //分别设置window长度、容忍最大晚到时间和触发间隔

	    val windowDuration = "30 minutes"

	    val waterThreshold = "5 minutes"

	    val triggerTime = "1 minutes"

	

	    import spark.implicits._

	

	    spark.readStream

	    .format("kafka")

	    .option("kafka.bootstrap.servers", "broker1:port1,broker2:port2")

	    .option("subscribe", "stock")

	    .load()

	    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

	    .as[(String, String)]

	    //解析数据

	    .map(f => {

	        val companyNo = f._1

	        val infos = f._2.split(",")

	        (f._1,infos(0),infos(1),infos(2),infos(3),infos(4))

	    })

	    .toDF("companyno","timestamp","price","bidprice","sellpirce","avgprice")

	    .selectExpr(

	        "CAST(companyno AS STRING)",

	        "CAST(timestamp AS TIMESTAMP[DF1] )",

	        "CAST(price AS DOUBLE)",

	        "CAST(bidprice AS DOUBLE)",

	        "CAST(sellpirce AS DOUBLE)",

	        "CAST(avgprice AS DOUBLE)")

	    .as[(String,Timestamp,Double,Double,Double,Double)]

	    //设定水位

	    .withWatermark("timestamp", waterThreshold)

	    .groupBy(

	          window($"timestamp", 

	              windowDuration), 

	              $"companyno")

	    //求出最高价、最低价和收盘价,其中收盘价需要自己开发UDAF

	    .agg(

	          max(col("price")).as("max_price"),

	          min(col("price")).as("min_price"),

	          ClosePriceUDAF(col("price").as("latest_price")))

	    .writeStream

	    .outputMode("append")

	    .trigger(Trigger.ProcessingTime(triggerTime))

	    //输出到HBase中

	    .foreach(HBaseWriter)

	    .start()

	    .awaitTermination()

	

	  }

	}



原文地址:https://www.cnblogs.com/weijiqian/p/14341981.html