Spark Streaming

Spark Streaming

1、介绍

Spark Streaming是Spark core API的扩展,针对实时数据流计算,具有可伸缩性、高吞吐量、自动容错机制的特点。数据源可以来自于多种方式,例如kafka、flume等等。使用类似于RDD的高级算子进行复杂计算,像map、reduce、join和window等等。最后,处理的数据推送到数据库、文件系统或者仪表盘等。也可以对流计算应用机器学习和图计算。

spark_025

在内部,spark streaming接收实时数据流,然后切割成一个个批次,然后通过spark引擎生成result的数据流。

spark_026

Spark Streaming提供了称为离散流(DStream-discretized stream)的高级抽象,代表了连续的数据流。离散流通过kafka、flume等源创建,也可以通过高级操作像map、filter等变换得到,类似于RDD的行为。内部,离散流表现为连续的RDD。

2、体验Spark Streaming编程

  1. 创建模块,添加spark-streaming的maven依赖

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    

  2. 编写word count的scala程序

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Spark Streaming程序
      */
    object WordCountStreamingScala {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("streaming")
        conf.setMaster("local[*]")
    
        //创建SparkStreamingContext
        val sc = new StreamingContext(conf , Seconds(1)) ;
    
        //行流,对接套接字文本流
        val lines = sc.socketTextStream("s101" , 8888)
    
        //单词流
        val words = lines.flatMap(_.split(" "))
    
        //对流
        val pairs = words.map((_, 1))
    	
        //计算结果
        val result = pairs.reduceByKey(_+_)
    
        //打印结果
        result.print()
    
        //启动上下文
        sc.start()
        
        //等待停止
        sc.awaitTermination()
      }
    }
    
    
  3. 导入log4j属性配置文件,修改日志级别,否则输出过多信息,不利用观察

    #
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #
    
    # Set everything to be logged to the console
    # 修改这里的INFO为ERROR
    # log4j.rootCategory=INFO, console
    log4j.rootCategory=ERROR, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    
    # Set the default spark-shell log level to WARN. When running the spark-shell, the
    # log level for this class is used to overwrite the root logger's log level, so that
    # the user can have different defaults for the shell and regular Spark apps.
    log4j.logger.org.apache.spark.repl.Main=WARN
    
    # Settings to quiet third party logs that are too verbose
    log4j.logger.org.spark_project.jetty=WARN
    log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
    log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
    log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
    log4j.logger.org.apache.parquet=ERROR
    log4j.logger.parquet=ERROR
    
    # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
    log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
    log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
    
    

  4. 启动nc服务器

    $>nc -lk 8888
    

  5. 启动streaming程序

    spark_027

  6. 在nc服务器程序命令输入单词

    spark_028

  7. 在Streaming控制台查看结果输出

    spark_029

  8. 编写java版流应用程序

    package com.oldboy.bigdata.spark.java;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.Seconds;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    /**
     * spark streaming java版
     */
    public class WordCountStreamingJava {
      public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf() ;
        conf.setAppName("streaming java") ;
        conf.setMaster("local[*]") ;
    
        //创建上下文
        JavaStreamingContext sc = new JavaStreamingContext(conf , 
                                                           Durations.seconds(1)) ;
        //行
        JavaDStream<String> lines = sc.socketTextStream("s101", 8888);
        //单词
        JavaDStream<String> words = lines.flatMap(
          new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
              String[] arr = s.split(" ") ;
              return Arrays.asList(arr).iterator();
            }
        }) ;
        //对流
        JavaPairDStream<String, Integer> pairs = words.mapToPair(
          new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
              return new Tuple2<String, Integer>(s, 1);
            }
        }) ;
    
        //结果
        JavaPairDStream<String, Integer> res= pairs.reduceByKey(
          new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
              return v1 + v2 ;
            }
        }) ;
    
        //输出结果
        res.print();
    
        //启动
        sc.start();
        sc.awaitTermination();
      }
    }
    
    

3、基本概念

3.1 StreamingContext初始化

appName是应用程序名称,master是Spark,、Mesos或YARN,也可以是local,local是本地模式运行。实际应用不需要指定master值,通过spark-submit提交命令中获取该参数,定义完上下文后,必须要完成如下工作:

  • 通过创建离散流定义数据源
  • 为流定义变换等计算工作
  • streamingContext.start()开始接受数据并进行处理
  • 使用streamingContext.awaitTermination()函数停止应用
  • 手动调用streamingContext.stop()方法停止应用

切记:

  • 上下文启动后,不能设置新的计算方法
  • 上下文停止后,不能重启
  • 流上下文停止后,还会停止SparkContext,如果不希望停止SparkContext,可以通过stop(false)。
  • SparkContext可以重用来创建多个流上下文,新的流上下文创建前需要停止上一个流上下文。

3.2 离散流(DStream)

离散流是Spark流应用的抽象,表示的是连续的数据流,数据流要么从数据源而来,或者通过变换生成。在内部,离散流表示连续的RDD。

spark_030

对离散流的任何应用,都会转换为操纵底层的RDD:

spark_031

底层的RDD变换工作,Spark引擎进行计算。

3.3 Input DStream和Receiver

InputStream Dstream也是一种DStream,从数据源接受的数据流,每个Input DStream都和一个Receiver关联,Receiver是接受数据并存储在Spark内存中。

Spark内部提供了两种类型的源:

  • 基本源

    Spark API直接能够使用,比如FileSystem或Socket连接。

  • 高级源

    像kafka、flume等源需要借助于第三方工具类进行连接。

Spark可以在一个流上下文中创建多个InputStream,就可以进行并行计算,这些创建的多个Input DStream具有相同时间切片,不可能给不同的Input DStream分别设置时间切片,因为时间切片设置在StreamContext中完成,同时也会创建多个Receiver。接受器单独占用一个CPU内核,即在一个单独的线程中死循环方式读取数据,需要分配足够的cpu内核来处理数据。保证CPU内核数据大于Receiver个数。

注意事项:

  • 本地模式下,不能使用local或者local[1],Receiver占据唯一的线程,没有线程执行计算工作。
  • 扩展到集群,内核数大于Receiver个数。

4、Receiver

4.1 内部结构

Receiver内部维护了队列,放置的是Block对象,Block包含blockId的ArrayBuffer两个属性。每个Block对应一个分区,默认每200ms(可通过spark.streaming.blockInterval修改)生成一个Block对象并推送到队列中,在StreamingContext中指定的时间片就是一个RDD的时长,因此每个RDD含有多少分区,只要计算一下是200ms的多少倍,然后就可以确定RDD内含有多少个分区了,但如果没有产生数据,就就不会生成分区,因此分区数不会超过这个倍数。内部结果如图所示:

spark_036

4.2 分区数控制

修改块生成间隔即可改变分区数,代码如下:

//块间隔设置,org.apache.spark.streaming.receiver.BlockGenerator#103
conf.set("spark.streaming.blockInterval", "500ms")

4.3 限速处理

  • 每秒接收记录数

    Spark Streaming可以控制每个Receiver每秒接收消息条数的上限,默认没有设置,就没有上限。该种方式缺点可能对集群处理能力估计不足,导致计算资源浪费。

    //每秒最多接受20条记录
    conf.set("spark.streaming.receiver.maxRate" , "20")
    

  • 压后(backpress)处理

    可以上spark Streaming基于当前batch的调度延迟与处理时间来控制接收速率,以备让系统只接受系统能够处理的速率。可以通过spark.streaming.backpressure.enabled属性开启,默认该属性是禁用的。对于Receiver的第一个批次的速率限制通过spark.streaming.backpressure.initialRate进行设置。启用压后处理属性后,在Spark Streaming内部,会动态设置Receiver接收速率的最大值。如果设置了spark.streaming.receiver.maxRatespark.streaming.kafka.maxRatePerP-artition属性,则速率不会超过这一设置。具体配置方式如下:

    //启用压后控制
    conf.set("spark.streaming.backpressure.enabled" , "true")
    
    //设置第一个batch的接收速率
    conf.set("spark.streaming.backpressure.initialRate" , "10000")
    

5、window操作

5.1 介绍

窗口是若干RDD的集合,窗口的长度必须是批次的整倍数,窗口的滑动间隔也必须是批次整倍数。比如每分钟查询最近一小时内的百度热词,一分钟就是窗口的滑动间隔,一小时就是窗口长度。

spark_033

spark_032

5.2 编程实现

val result = pairs.reduceByKeyAndWindow((a:Int,b:Int)=>a + b, Seconds(10) , Seconds(3))

6、updateStateByKey

按key更新状态是spark streaming对k-v类型的DStream提供的操作,是对每个K关联一个状态对象,可以是任何对象,该状态对象会随着DStream的流动,从上一个的RDD流向到下一个RDD,工作流程如下图所示:

spark_034

该函数需要传递一个高阶函数,方法签名如下:

def updateStateByKey[S: ClassTag](
  updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope {
  updateStateByKey(updateFunc, defaultPartitioner())
}

函数有两个参数,Seq[V]是本次RDD中K下的V值列表,Option[S]就跟K关联的状态对象,该函数返回状态对象Option[S],使用Option作为状态类型,意味着状态对象可能不存在。

例如,从应用启动开始,每个单词出现的次数,则使用如下代码实现:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 
  */
object WordCountStreamingUpdateByKeyScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("streaming")
    conf.setMaster("local[8]")
    //创建SparkStreamingContext
    val sc = new StreamingContext(conf , Seconds(2)) ;
    
    /******************************************************
     *                                                    *
     *           注意:此处需要设置检查点目录,存放rdd数据       *
     *                                                    *
     ******************************************************/
    sc.checkpoint("file:///H:\spark\streaming")

    //行流
    val lines = sc.socketTextStream("s101" , 8888)

    //单词流
    val words = lines.flatMap(_.split(" "))

    //对流
    val pairs = words.map((_, 1))

    //窗口化操作.
    val result = pairs.reduceByKey((a:Int,b:Int)=>a + b)

    //
    def updateFunc(a:Seq[Int] , b:Option[Int]) = {
      if(b.isEmpty){
        if(a.isEmpty){
          Some(0)
        }
        else{
          Some(a.sum)
        }
      }
      else{
        val old = b.get
        if(a.isEmpty){
          Some(old)
        }
        else{
          Some(old + a.sum)
        }
      }
    }
    val ds = result.updateStateByKey(updateFunc)
    ds.print()
    sc.start()
    sc.awaitTermination()
  }
}

该带码执行的结果如下:

spark_035

7、避免大量小文件

spark Streaming提供的saveAsTextFile方法是将每个RDD的每个分区输出到一个文件中,由于时间片通常是几秒,因此导致产生大量的小文件,进而影响Namenode的资源以及计算时导致大量的task出现。解决办法就是使用DStream的foreachRDD手动遍历每个分区,按照自定义法则将多个分区数据写入一个文件中,以下就是将多个RDD中相同分区索引的数据写入一个文件中,文件以主机名-精确化分的时间串-分区索引格式进行命令,代码实现如下:

result.foreachRDD(rdd=>{
  rdd.mapPartitionsWithIndex((idx,it)=>{
    //当前时间
    val now = new java.util.Date
    //格式化
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH-mm")
    //提取时间串
    val minStr = sdf.format(now)
    
    //主机名
    val host = InetAddress.getLocalHost.getHostName
    
    //
    val file = new File("H:\spark\streaming", 
                        host + "-" + minStr + "-" + idx+ ".txt")
    
    //文件输出流
    val out = new FileOutputStream(file, true)
    
    //写入整个分区数据
    for (e <- it) {
      out.write(e.toString().getBytes)
      out.flush()
    }
    out.close()
    it
  }).count()
})

8、Spark Streaming同Spark SQL集成

spark streaming中使用SQL,只需要创建Spark Session对象,将RDD转换成DataFrame即可,然后注册DataFrame成为临时视图后,就可以使用Spark SQL了,具体代码如下:

import java.io.{File, FileOutputStream}
import java.net.InetAddress
import java.text.SimpleDateFormat

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  *
  */
object WordCountStreamingSQLScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("streaming")
    conf.setMaster("local[8]")
    //创建SparkStreamingContext
    val sc = new StreamingContext(conf , Seconds(2)) ;
    sc.checkpoint("file:///H:\spark\streaming")

    //行流
    val lines = sc.socketTextStream("127.0.0.1" , 8888)
    
    lines.foreachRDD(rdd=>{
      val rdd1 = rdd.flatMap(_.split(" "))
      val rdd2 = rdd1.map((_,1))
      
      //创建SparkSession对象,使用RDD的配置即可
      val sess = SparkSession.builder()
        .config(rdd.sparkContext.getConf)
        .getOrCreate()
      
      //导入Spark Session隐式转换
      import sess.implicits._
      
      //转换RDD成为DataFrame,并注册成临时表
      rdd2.toDF("word" ,"cnt").createOrReplaceTempView("_wc")
      
      //执行sql操作
      sess.sql("select word , count(cnt) from _wc group by word").show(1000,false)
    })

    sc.start()
    sc.awaitTermination()
  }
}

注意:

sc.socketTextStream("127.0.0.1" , 8888)方法如果是本地的socket,需要使用127.0.0.1地址,使用localhost不好使,bug!!!!!

9、程序的部署

运行spark Streaming应用不能在spark-shell下执行,需要使用spark-submit命令提交执行。需要如下内容:

  • cluster manager

    通过--master指定master URL地址。

    $>spark-submit --master spark://s101:7077 ...
    
  • 导出jar包

    如果应用包含第三方组件,比如kafka,需要将所有的第三方类库导出到jar包中,spark自身的和Spark Streaming的包则不必。

  • 给executor配置足够的内存

    由于接收到的数据必须存在内存中,executor必须提供足够的内存来存储他们。如果又启用了window操作,最少要配置window长度容纳的数量。

    $>spark-submit --executor-memory 2g ...
    

  • 配置检查点

    如果应用中用到了检查点,必须使用hadoop兼容的具有容错存储能力的检查点目录, 比如hdfs或S3。流计算应用会在里面写入检查点信息供故障恢复。

    val sc = new StreamingContext(conf , Seconds(2)) ;
    //配置检查点目录
    sc.checkpoint("file:///H:\spark\streaming")
    

  • 配置Driver的自动重启

    若需要Driver故障时自动恢复,那么用来运行流计算应用的部署命令必须能够监控driver进程并在他故障时重启。不同的集群管理有着不同的工具可以实现这一点:

    • spark standalone

      standalone cluster模式支持应用程序非零退出后的自动重启,若要使用这一特性,可以在spark-submit命中增加--supervise参数来获得。如下所示:

      $>spark-submit --supervise --master spark://s101:7077 --class MyApp myapp.jar
      

      如果要杀死落入重复失败状况下的应用,可以执行以下命令:

      $>spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
      
    • YARN

  • 配置写前日志

    从spark1.2开始,引入了写前日志来获得强容错性保证。如果启用的话,接收到的所有数据都会写入到检查点设置的写前日志中。这可以防止driver恢复时数据丢失,由此可以确保流数据零流失。可用通过spark.streaming.receiver.writeAhea-dLog.enable设置为true来启用写前日志,但会导致每个Receiver接受吞吐量的降低,可以通过增加Receiver进行补偿。

    此外,如果开启了写前日志,可以禁用spark对接受的数据副本化存储,因为写前日志已经存储在了副本模式的文件系统中了。可以通过设置InputStream的持久化模式为StorageLevel.MEMORY_AND_DISK_SER来完成。代码如下:

    //启用写前日志
    conf.set("spark.streaming.receiver.writeAheadLog.enable" , "true")
    ...
    val lines = sc.socketTextStream("s101" , 8888)
    //只有一个副本
    lines.persist(StorageLevel.MEMORY_AND_DISK) ;
    
  • 设置最大接受速率

    资源不足时可以进行限速处理,Receiver类型可以通过spark.streaming.receiver.maxRate设置,kafka方式可以通过 spark.streaming.kafka.maxRatePerPartition设置。Spark1.5之后引入了压后机制,不再需要限速设置,Spark Steaming自动找出速率限制并进行动态调增。压后控制可以通过spark.streaming.backpressure.enabled=true开启。

    val conf = new SparkConf
    conf.set("spark.streaming.backpressure.enabled" , "true")
    

10、RDD的缓存管理

rdd执行结果可以进行缓存起来,以备后面使用rdd时不需要重复计算,直接提取计算结果即可。设置rdd缓存之后,必须unpersist之后才能继续再重新设置缓存级别。rdd可以缓存结果到内存中或磁盘中,如果是磁盘级别,保存数据到临时目录下,临时目录可以通过spark.local.dir进行修改。

// 缓存RDD
rdd.cache()
rdd.persist()
// 内存中缓存
rdd.persist(StorageLevel.MEMORY_ONLY)

RDD的缓存级别:

// 存储级别
class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

11、RDD的检查点

调用rdd.checkpoint()方法时,会将rdd结果保存到检查点目录,检查点目录通过sc.setCheckPointDir()设置。

// 
sc.setCheckpointDir("file:///H:/chk")

12、修改spark的本地临时目录

conf.set("spark.local.dir" ,"file:///H:/tmp" );
原文地址:https://www.cnblogs.com/xupccc/p/9544621.html