StructuredStreaming解析JSON CVS

一、接收kafka中json数据输出控制台

Kafka Producer:

{"createTime":"1532598069","event":{"info":{"AAA":"three","BBB":"four","CCC":"haha"}}}

Kafka Consumer:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._


object  extends App {
  val spark = SparkSession
    .builder
    .appName("DynamicSchema")
    .master("local[*]")
    .getOrCreate()

  
  val schema = new StructType()
    .add("createTime", StringType)
    .add("event", MapType(StringType, new StructType()
      .add("AAA", StringType, true)
      .add("BBB", StringType, true)
      .add("CCC", StringType, true)
      .add("DDD", StringType, true)
    ))

  val parsed = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "dynamic-schema")
    .option("startingOffsets", "earliest")
    .load()
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

  import spark.implicits._

  val event = parsed.select(explode($"parsed_value.event")).select("value.*")

  val console = event.writeStream
    .format("console")
    .outputMode(OutputMode.Append())

  val query = console.start()

  query.awaitTermination()

}

StructuredStreaming输出结果:

+-----+----+----+----+
| AAA| BBB| CCC| DDD|
+-----+----+----+----+
|three|four|haha|null|
+-----+----+----+----+

二、读取文本json数据输出控制台

spark应用可以监听某一个目录,而web服务在这个目录上实时产生日志文件,这样对于spark应用来说,日志文件就是实时数据

Structured Streaming支持的文件类型有text,csv,json,parquet

在people.json文件输入如下数据:

{"name":"json","age":23,"hobby":"running"}
{"name":"charles","age":32,"hobby":"basketball"}
{"name":"tom","age":28,"hobby":"football"}
{"name":"lili","age":24,"hobby":"running"}
{"name":"bob","age":20,"hobby":"swimming"}

注意:文件必须是被移动到目录中的,且文件名不能有特殊字符

需求:

接下里使用Structured Streaming统计年龄小于25岁的人群的爱好排行榜

代码演示:

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
  * {"name":"json","age":23,"hobby":"running"}
  * {"name":"charles","age":32,"hobby":"basketball"}
  * {"name":"tom","age":28,"hobby":"football"}
  * {"name":"lili","age":24,"hobby":"running"}
  * {"name":"bob","age":20,"hobby":"swimming"}
  * 统计年龄小于25岁的人群的爱好排行榜
  */
object WordCount2 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    val Schema: StructType = new StructType()
      .add("name","string")
      .add("age","integer")
      .add("hobby","string")
    //2.接收数据
    import spark.implicits._
    // Schema must be specified when creating a streaming source DataFrame.
    val dataDF: DataFrame = spark.readStream.schema(Schema).json("D:\data\spark\data")
    //3.处理数据
    val result: Dataset[Row] = dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc)
    //4.输出结果
    result.writeStream
      .format("console")
      .outputMode("complete")
      .trigger(Trigger.ProcessingTime(0))
      .start()
      .awaitTermination()
  }
}

二、读取CVS数据输出控制台

cvs数据:

liwei,20,中国,2019-05-14
liwei,10,中国,2019-06-15
zhangsan,20,中国,2019-05-16
zhangsan,10,中国,2019-06-17

代码演示:

object cvsToEs_demo3 {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("sample-structured-streaming")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    // 测试用例数据对应schema , 重点说明: date 日期数据格式为 `yyyy-MM-dd`
    val userSchema = new StructType()
    .add("name", "string")
      .add("age", "integer")
      .add("address", "string")
      .add("date", "string")

    val csvDatas: DataFrame = spark.readStream
      .option("sep", ",") //逗号分割
      .schema(userSchema)
      .csv("data/csv/") //指定文件夹

    csvDatas.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .start()
      .awaitTermination()
  }

}
原文地址:https://www.cnblogs.com/chong-zuo3322/p/13524832.html