[Spark]-结构化流之输出篇(待重修)

5.结构化流的输出

  一旦定义好了streaming DataFrame/Dataset的最终结果,剩下的就是一些计算输出了.为此,必须使用 DataStreamWriter通过 Dataset.writeStream() 返回.此时必须以下一个或多个

   输出落地 的详细信息:  Data format, location 等等

   输出模式(Output mode)

   查询名称(Query name)  可选,指定用于标识的查询的唯一名称

   触发器间隔(Trigger interval):可选地,指定触发间隔。如果没有指定,系统将检查尽快获得新数据前处理完成。如果一个触发时间错过了,因为前面的处理还没有完成,则系统将触发立即处理

   检查点位置(Checkpoint location) 对于可以保证 end-to-end fault-tolerance (端对端容错)能力的某些 output sinks ,请指定系统将写入所有 checkpoint (检查点)信息的位置。

                   这应该是与 HDFS 兼容的容错文件系统中的目录.

  5.1 输出模式

    追加(append)(默认) 这是默认的模式.在上次触发后,只有新行添加到结果表(result-table)才会触发。

             这是只支持那些查询,行添加到结果表永远不会改变,因为这种模式保证每一行将只输出一次。例如,只有选择查询,地图,flatMap,过滤,加入等将支持附加模式

    完全(complete) 每次触发时都会将整个结果表(result-table)输出.例如聚合结果等

    更新(update) 只有结果表在上次被触发后被更新才会触发

    不同的查询支持不同的输出模式,具体见下:

查询类型   支持模式 描述
带聚合的查询 聚合带事件时间的水印 Append, Update, Complete

Append 使用水印来删除旧的聚集状态 窗口将在水印最终阈值的时候聚合.所以结果将在水印过期,最终结果确定的情况下添加进结果表

Update 使用水印来删除旧的聚集状态

Complete 不会使用水印来删除旧的聚集状态

不带水印的聚合 Complete, Update 由于没有使用水印,旧的聚集状态不会被删除.不支持append,因为对整数据的聚合结果会不断更新,不符合append的语义
mapGroupsWithState Update  
flatMapGroupsWithState 追加操作模式 Append flatMapGroupsWithState 之后允许 Aggregations (聚合)
  更新操作模式 Update flatMapGroupsWithState 之后不允许 Aggregations (聚合)。
带join的查询 Append join不支持Update,Complete模式
其它查询 Append, Update 不支持Complete.因为非聚合数据的结果表,全部保存进结果表时不允许的

  5.2 输出落地

      5.2.1 落地文件

        落地文件,是适合长期保存.这里的文件是指HDFS等任何Hadoop支持的文件系统.

        落地文件,支持自动分区.并且支持有且仅有一次的容错        

          val query =
            streamingSelectDF
              .writeStream
              .format("parquet")
              .option("path", "文件保存路径")
              .partitionBy("zip", "day") //分区字段
              .trigger(ProcessingTime("25 seconds"))
              .start()

    5.2.2 落地Kafka

      kafka专章描述,具体见后

    5.2.3 落地控制台(debug使用)

       输出被直接打印到控台或者stdout日志

    5.2.4 落地Memory接收器(debug使用)

    5.2.5 foreach

       foreach落地,是用户自定义输出的一种方式.foreach需要用户实现 ForeachWriter 类.实际处理取决于用户如何实现.(foreach只能使用在Scala/Java中)

      它将在触发器(trigger)之后生成结果行时调用一个用户实现.

      注意: foreach的实现载体是多个executor中的某一个

      5.2.5.1 实现

        open: writer 初始化时执行(例如打开连接,开启事务等).它具有两个入参: version=>每个触发器单调递增的ID  partitionId =>分区ID.

           open将返回一个布尔值.当为false时,后续将不会产生任何调用.用户可以根据自己的逻辑,用这个返回值指出本次输出后续是否还有必要执行

        proccess:根据open的返回值决定是否需要执行.

        close:无论open返回什么值,close必然会执行.这里适合用户做一些资源回收操作       

            class WriterToMysql extends ForeachWriter[Row] {
            private var connection: Connection = null;
            
            override def open(partitionId: Long, version: Long): Boolean = {
                Class.forName("com.mysql.jdbc.Driver")
                connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/test?serverTimezone=UTC&characterEncoding=UTF-8&useSSL=false", "root", "12abAB")
                return true
            }
            
            override def process(value: Row): Unit = {
                val stmt = connection.createStatement()
                try {
                stmt.execute(
                    s"""
                    |replace into `test`.`structured_streaming_window_watermark_app_test`
                    |(`window`,`word`,`count`)
                    |VALUES
                    |('${value.get(0).toString}','${value.get(1)}',${value.getLong(2)});
                    """.stripMargin)
                }
                finally {
                stmt.close()
                }
            }
            
            override def close(errorOrNull: Throwable): Unit = {
                try {
                if (connection != null) {
                    connection.close()
                }
                }
                finally {}
            }
            }
            //使用
            val query = windowWordCounts.writeStream.outputMode("update")
                .foreach(new WriterToMysql())
                .option("truncate", "false")
                .start();

    5.2.6 输出落地可以承载的输出模式

接收器 支持的输出模式 可选项 容错机制 备注
文件 Append path 支持容错(有且仅有一次) 支持分区
kafka Append, Update, Complete 详见kafka专章 支持容错(最少一次)  
foreach Append, Update, Complete   取决于foreach的实现  
控制台 Append, Update, Complete numRows:每次打印的行数(默认20),truncate:输出太长时截断(默认true) 不支持容错  
内存 Append, Complete   不支持容错,但重启时重建整张结果表  

  5.3 触发器

    Spark2.3 已经具有以下四种触发器

    默认触发器

      如果不设置任何触发器,将默认使用此触发器.触发规则是:上一个微批处理结束,启动下一个微批处理

    时间间隔触发(Fixed interval micro-batches)

      设置了触发间隔的触发器. 

        如果上一个微批处理在触发间隔前结束,会等待间隔时间结束后再触发.

        如果在触发间隔前还没有处理结束.后续暂时不会触发,会等待上个微批处理结束后再立即启动

      .trigger(Trigger.ProcessingTime("2 seconds"))

    只触发一次(One-time micro-batch)

      触发一次处理全部数据,然后自然停止.适合用在关闭集群时数据的最后处理和清理

      .trigger(Trigger.Once())

    连续触发器(Continuous with fixed checkpoint interval)

      连续处理是Spark2.3才提供的触发器.它是以一个低延迟,连续处理模式(没有间隔概念)的模式进行处理.

      .trigger(Trigger.Continuous("1 second"))

      不过很可惜,Spark2.3中连续处理依然只是一个实验版本,在此只做一个简单介绍.

        连续处理会启动多个长时间的运行的任务.这些任务不断地从源中读取数据、处理数据并不断地向接收器写入数据。查询所需的任务数量取决于查询可以并行地从源中读取多少个分区

          因此,在开始一个连续的处理查询之前,必须确保集群中有足够多的内核并行执行所有任务.(10分区的Kafka,必须至少有10个内核才能进行查询)

        现阶段连续处理只能接收Map系操作,还不支持聚合系操作.

        支持的数据源只Kafka,Rate两种

        不支持失败任务的重试.任何失败都会导致查询终止,必须手动的从检查点恢复.

      

原文地址:https://www.cnblogs.com/NightPxy/p/9278881.html