[Spark]-Streaming-输入

1.Input DStreams 和 Receivers

  Spark Streaming的输入由两个部分组成 Input DStream 和 Receiver

  Input DStream  代表的是从数据源接收到由输入数据组成的数据流

  Receiver 是从数据源获取数据并写入Spark内存的实际执行者.每一个Input DStream都会与一个Receiver 关联.

  如果一个Spark Streaming 应用需要并行的接收多个数据流.可以创建多个Input DStream(这将同时创建多个Receiver)

    需要注意的是,在Spark Streaming应用中,executor运行的将是一个长期任务.因为每一个executor都将长期占用某一个核(文件流某些情况除外)

    .因此,Spark Streaming应用必须保证有足够多的核来运行.

      这里的足够多核是指: 可用核必须大于Receiver数.因为计算本身也需要一个executor.否则系统只能正确接收数据,但无法实际处理数据.

      比如说:本地模式 "local/local[1]",唯一的核被交给Receiver 而没有执行计算,所以是无法得到计算结果的.

2.内置数据源

  2.1 File Streams

     从文件系统读取数据转化为DStream.(这里的文件系统包括本地文件系统,或是HDFS,S3等任何Hadoop支持的分布式文件系统)

     File Stream 有以下要点:

      i).不支持嵌套目录.换句话说,只有直接处于目标目录下的文件才能被监控读取.

      ii).目录支持通配符.在这种情况下,Spark Streaming将对符合条件的目录列表进行监控

      iii).目录下的所有文件必须有统一的格式

      iv).目录下的某一个文件,视为某个时间范围数据的一部分.这个时间范围的判定标准是文件修改时间而不是创建时间

      v).文件在窗口一次执行中仅读取一次.在这种情况,文件的后续写入无效.(2.3新特性,历史是文件任何情况下都不允许修改)

      vi).目录下的文件越长,扫描一次修改时间所花费的时间就越多.哪怕文件其实并没有任何修改

      vii).只有修改时间落在某个窗口范围内,才会被某个窗口拉取.(依次,可以使用 FileSystem.setTimes() 设置文件修改时间来让文件落入到之后的某个窗口中)

              viii).注意不同系统下的文件修改时间策略

        比如HDFS,文件修改时间是在写入设置的.这样当某个窗口读取时,可能文件还没有写入完毕而继续保持打开状态,从而导致读取失败

        或者S3系统,文件修改时间是在写入完毕之后拷贝写入.这样可能导致数据本来应该落入某个窗口而无法落入.

        总之,用户需要仔细理清目标环境下的文件修改时间策略,让Spark能以用户期望的行为来运行.

   文件流读取时,如果是一个简单文件读取(streamingContext.textFileStream(dataDirectory),将不会单独占用一个核<=开发环境中,生产很难有如此简单文件读取的情况

  2.2 Socket 

    ssc.socketTextStream("127.0.0.1", 9999)

    一般开发环境中使用

  2.3 高级数据源

    Spark Streaming 已经内置一些诸如 Kafka, Kinesis 和 Flume 等高级数据源,这些后面专章介绍.

3.自定义数据源

  3.1 数据源的类型

    Spark Streaming,依据收到数据后,是否需要回发数据源ack消息(确认收到数据),将数据源分为两大类 不可靠数据源和可靠数据源

  3.2 不可靠数据源

    继承实现 Receiver[T],并需要指定数据存储级别,和实现 onStart(): Unit 和 onStop(): Unit 两个方法.并在其中内置了store和 restart两个方法

    一个不可靠数据源Demo如下   

            /**
            * 自定义Socket数据源
            * 存储级别为: StorageLevel.MEMORY_AND_DISK_2
            *   MEMORY_AND_DISK 表示保存直内存,但允许溢出时到磁盘  _2 表示存储的副本数
            *   
            * 使用方法: ssc.receiverStream(new CusReceiver("127.0.0.1", 9999))
            */
            class CusReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
            /**
                * 接收器启动时调用onStart方法,用于初始化接收数据所需要的所有资源
                * 注意:
                *   onStart必须保持非阻塞的方式执行,因为接收器运行是在不同线程中(线程切换)
                */
            override def onStart(): Unit = {
                new Thread("Socket Receiver") {
                override def run() {
                    receive()
                }
                }.start()
            }
            
            /**
                * 当接收器关闭时调用 onStop(),用于清理资源等
                */
            override def onStop(): Unit = {}
            
            private def receive(): Unit = {
                var socket: Socket = null
                var userInput: String = null
                try {
                socket = new Socket(host, port);
                val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
                userInput = reader.readLine()
            
                while (!isStopped && userInput != null) {
                    /**
                    * 将读取的数据保存至内存中
                    *   这里将应用 Receiver构造设置的存储级别来进行保存
                    */
                    store(userInput)
                    userInput = reader.readLine()
                }
                reader.close()
                socket.close()
            
                /**
                    * Spark Streaming的接收器重启(这是一个Future模式的异步操作)
                    *   这将立即关闭接收器并调用onStop,并在之后的某个时间启动接收器(调用onStart)
                    *
                    * 从源码可以看出,receiver是长期占用某个核.哪怕重启也是以线程休眠的形式,并不会释放核
                    * 具体过程如下:
                    *   1.记录重启原因日志
                    *   2.停止当前接收器(调用onStop)
                    *   3.线程休眠(Spark .stream . receiverrestartdelay)毫秒
                    *   4.启动当前接收器(调用onStart)
                    */
                restart("Trying to connect again")
                } catch {
                case e: java.net.ConnectException =>
                    restart("Error connecting to " + host + ":" + port, e)
                case t: Throwable =>
                    restart("Error receiving data", t)
                }
            }
            }

    

  3.3 可靠数据源

    一个不可靠数据源只是单纯的拉取数据,然后调用store保存而无需考虑其它的逻辑.但如果是可靠数据源,则必须要考虑以下:

      强容错的保证,保证零数据丢失

      接收放实现控制块生成和处理接收速率.

      实现目标数据源的Ack机制

    未完,demo后补

原文地址:https://www.cnblogs.com/NightPxy/p/9292063.html