spark总结

foreachRDD作用于DStream中每一个时间间隔的RDD;
foreachPartition作用于每一个时间间隔的RDD中的每一个partition;
foreach作用于每一个时间间隔的RDD中的每一个元素;

######################################################################
推荐系统中的矩阵分解技术
http://www.52nlp.cn/category/%E6%8E%A8%E8%8D%90%E7%B3%BB%E7%BB%9F

修改kafka topic的offset几种方法
https://blog.csdn.net/yxgxy270187133/article/details/53666760
    
spark零基础学习路线指导
https://blog.csdn.net/xiangxizhishi/article/details/75948088

idea快捷键使用及代码构建maven git
https://blog.csdn.net/Happy_wu/article/details/86583709

史上最详细的Hadoop环境搭建
https://blog.csdn.net/hliq5399/article/details/78193113

Hadoop 系列(一)基本概念
https://www.cnblogs.com/binarylei/p/8903601.html

大数据基础Hadoop 3.1.1 的高可用HA安装~踩坑记录
https://blog.csdn.net/fengruiqi/article/details/86498890


//********************************************************
ss=SparkSession.builder().enableHiveSupport().getOrCreate()
Rdd.persist(StorageLevel.MEMORY_AND_DISK)

    def getrealTime(pattern: String): String = {
        val timeTag = System.currentTimeMillis()
        val changeTime = new Date(timeTag)
        val dataFormat = new SimpleDateFormat(pattern)
        dataFormat.format(changeTime)
    }
    
//***********************************************
    val loop=new Breaks
    loop.breakable{
        if(true){//符合条件
            loop.break() //跳出循环
        }
    }

//********************************************************
1.1. spark streaming
    ①对实时流数据的处理有可扩展性,高吞吐量,可容错性等特点.
    ②    kafka,flume,witter,ZeroMQ,Kinesis等源获取数据;
        高阶函数map,reduce,join,window等组成复杂算法计算出数据;
        处理后的数据可以推送到文件系统,数据库,实时仪表盘.
        
1.2. StreamingContext
    
        val conf=new SparkConf().setAppName(appName).setMaster(master) //设定Master节点,设定应用名称
        val ssc=new StreamingContext(conf,Seconds(1))    //处理数据的时间间隔
    appName表示应用程序显示在集群UI上的名字;
    master是一个Spark,Mesos,YARN集群URL或者一个特殊字符串"local";
    
1.3. DStream
    离散流(discretized stream),代表连续的数据流.
    val sparkConf = new SparkConf().setAppname("HDFSWordCount").setMaseter("spark://172.19.1.232:7077")
    val ssc=new SteamingContext(sparkConf,Seconds(30)) //
    val lines = ssc.textFileStream("file:///home/spark/data")
    val words=lines.flatMap(_.split(" "))
    val wordCounts=words.map(x =>(x,1).reduceByKey(_+_))
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()



    
    



    
1.4 rdd={(a,1),(a,2),(b,3),(c,4),(c,5)}
    val rdd2=rdd.reduceByKey{(x,y)=>  //reduceByKey合并key相同的; 所以x,y都是指value
        x+y                 //rdd2={(a,3),(b,3),(c,9)}
    }





1.5 过滤空值
    rdd.filter(x=>{ !x._1.isEmpty && null !=x._1})






############################################################

java.io.NotSerializableException: org.apache.spark.SparkContext



############################################################
class FruitNumberProperties extends Serializable{
    val FruitNumberMap=Map("apple" -> "1","orange" -> "2")
}


//创建hdfs
val hdfs =org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://test"),new org.apache.hadoop.conf.Configuration())

//加载WebsiteConfig文件

    def loadWebsiteConfig(websiteConfigFile:String,allChannelSet:Set[String],ss:SparkSession):Map[String,Set[String]]={
        try{
            val channelConfig =ss.sparkContext.textFile(websiteConfigFile).collect()
            val filedArray = Array(("website","filed"),("channel","filed"))
            val result = channelConfig.filter(x=>x.nonEmpty).map{record =>
                val sparseJson = Util.funAnalysisJson(record,filedArray)
                val channelSet = if(sparseJson.length ==1 || (sparseJson.length >1 && sparseJson(1).trim.toLowerCase.equals("all"))){
                    allChannelSet
                }else{
                    sparseJson(1).split(",").toSet
                }
                (sparseJson(0),channelSet)
            }
            result.toMap
        }catch{
            case e: Exception =>
                println(websiteConfigFile+"load failed")
                Map.empty[String,Set[String]]
        }
    }




#####################################################################
Spark运行模式:
    1. local: 本地线程方式,主要用于开发调试
    Hadoop YARN: 集群运行在Yarn资源管理器上,资源管理交给Yarn,spark只负责进行任务调度和计算
    2. 各Spark应用程序以相互独立的进程集合运行于集群之上,由SparkContext对象进行协调,SparkContext对象可以视为Spark应用程序的入口,被称为driver program,SparkContext可以与不同种类的集群资源管理器(Cluster Manager),例如Hadoop Yarn、Mesos等 进行通信,从而分配到程序运行所需的资源,获取到集群运行所需的资源后,SparkContext将得到集群中其它工作节点(Worker Node) 上对应的Executors (不同的Spark应用程序有不同的Executor,它们之间也是独立的进程,Executor为应用程序提供分布式计算及数据存储功能),之后SparkContext将应用程序代码分发到各Executors,最后将任务(Task)分配给executors执行。
    3. RDD操作包含 Transformations和Action;所有的transformation都是lazy的

    




#######################################################
application
job
stage
task

master
worker
executor
driver


碰到第一个action算子的时候,相当于触发了sc.runjob方法执行

DAGScheduler工作:从action算子开始,到把stage变成TaskSet提交到taskScheduler中去执行结束

###########################################################
distinct,groupByKey,reduceByKey,aggregateByKey,join,cogroup,repartition

数据倾斜的解决方案:
    ①使用Hive ETL预处理数据
        从根源上提前处理好hive表中的数据;
        (缺点:治标不治本,spark程序没有解决数据倾斜的能力)
    ②过滤少数导致倾斜的key
    ③提高shuffle的并行度
        实现简单,增加shuffle read task数量
        (缺点:缓解倾斜)
    ④两阶段聚合(局部聚合+全局聚合)
        第一阶段随机打乱,比如key可以加上前缀,将其分开;第二步再将前缀去掉
        (对于聚合类的shuffle操作,最好的解决方案)
    ⑤将reduce join转为 map join
        //reduce join 通用的join实现;但容易产生数据倾斜
        rdd1.join(rdd2)
        //map join  完美的避开了shuffle阶段,所以没有数据倾斜;但适用场景有限,只适合大小表做关联
        val bc=sc.broadCast(rdd1.toList)
        rdd1.foreachPartition(data =>{
            val data2=bc.value
            val data1=data
            data1.join(data2)
        })        
    ⑥采样倾斜key并分拆join操作
        将出现数据倾斜的key的所有数据,形成单独的数据集
        
    ⑦使用随机前缀和扩容的RDD进行join
    ⑧多种方案组合使用
#########################################################################
sc.read.format("json").load("/test/")
sc.read.json("test/")

df.write.json("test/")
df.write.format("json").save("/test/")

sc.read.load("test/xxx.parquet") //默认parquet格式,等价下一
sc.read.format("parquet").load("test/")

#######################################################################

①rdd.toDF
②sqlContext.createDataFrame(rdd)








#####################################################################
Block
输入可能以多个文件的形式存储在HDFS上,每个File都包含了很多块,称为Block。

InputSplit
当Spark读取这些文件作为输入时,会根据具体数据格式对应的InputFormat进行解析,一般是将若干个Block合并成一个输入分片,称为InputSplit,注意InputSplit不能跨越文件。
随后将为这些输入分片生成具体的Task。InputSplit与Task是一一对应的关系。

job
在spark rdd中,有action、transform操作,当真正触发action时,才真正执行计算,此时产生一个job任务

stage
以shuffle为界,当在一个job任务中涉及shuffle操作时,会进行stage划分,产生一个或多个stage。
Stage概念是spark中独有的。一般而言一个Job会切换成一定数量的stage。各个stage之间按照顺序执行。至于stage是怎么切分的,首选得知道spark论文中提到的narrow dependency(窄依赖)和wide dependency( 宽依赖)的概念。其实很好区分,看一下父RDD中的数据是否进入不同的子RDD,如果只进入到一个子RDD则是窄依赖,否则就是宽依赖。宽依赖和窄依赖的边界就是stage的划分点

task
一个stage可能包含一个或者多个task任务,task任务与partition、executor息息相关,即并行度。
Task是Spark中最新的执行单元。RDD一般是带有partitions的,每个partition的在一个executor上的执行可以任务是一个Task。

partition
partition个数即rdd的分区数,不同的数据源读进来的数据分区数默认不同,可以通过repartition进行重分区操作。

executor
executor运行在work上,一个work可以运行一个或多个executor,一个executor可以运行一个或者多个task(取决于executor的core个数,默认是一个task占用一个core,即有多少个core就可以启动多少个task任务)


Application
application(应用)其实就是用spark-submit提交的程序。比方说spark examples中的计算pi的SparkPi。一个application通常包含三部分:从数据源(比方说HDFS)取数据形成RDD,通过RDD的transformation和action进行计算,将结果输出到console或者外部存储(比方说collect收集输出到console)。

Driver
Spark中的driver感觉其实和yarn中Application Master的功能相类似。主要完成任务的调度以及和executor和cluster manager进行协调。有client和cluster联众模式。client模式driver在任务提交的机器上运行,而cluster模式会随机选择机器中的一台机器启动driver。从spark官网截图的一张图可以大致了解driver的功能。



######################################################################

schema: 表的字段的定义(名称,类型);当前这个表的数据存储目录
data:真实数据



离线计算   
批任务
实时处理   
流式处理
################################

storm  
    严格的一条数据计算一次,流式处理,但不一定是实时处理
    实时要求可以很高,吞吐量低
sparkStreaming
    一批数据计算一次(每个批次的时间间隔用户自由设置)
    实时要求低一点,吞吐量高一点(折中一点)
    流式计算是离线计算的一个特例
    
flink
    
################################
mapreduce: 每隔一段时间,执行一次任务,对两次任务之间的数据进行累积

storm: 一条一条的执行计算,(流式处理的核心思路);延迟低,亚秒级;数据消费至少一次(trident API 有且仅一次)

sparkStreaming: 很小的一批批的取执行计算(常见在1s到5min);伪实时;基于sparkCore,基于离线处理;有且仅一次

flink: 把离线处理看做是流式处理的一个特例,离散的批处理,基于流式处理;有且仅一次

#################################################

 
          节点            进程         线程
spark     worker          executor      task
storm     supervisor      worker        executor

###################################################
一站式通用解决方案

DStream:
    离散的RDD组成
    
UpdateStateByKey


输入数据流 → (接收器) →结果输出


有状态计算
无状态计算

#####################################################
窗口长度
滑动周期
二者都要是时间片的整数倍
每隔多长时间(滑动周期)计算过去多长时间(窗口长度)的数据

###################################################
package com.huawei.rcm.newsfeed.test

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}


object test38 {

  case class Fruit(name: String,age:Int,address:String)

  def main(args: Array[String]): Unit = {
    val ss = SparkSession.builder()
      .config("Spark SQL basic example","some-value")
      .master("local[2]")
      .appName("hello")
      //.enableHiveSupport()
      .getOrCreate()
    //enableHiveSupport,必须要有配置文件hive-site.xml
    val sc = ss.sparkContext
    val sqlContext =new SQLContext(sc)
    ss.sparkContext.setLogLevel("ERROR")


    val rdd=sc.parallelize(List(("apple",1,"China"),("strawberry",2,"China"),("banana",3,"America"),("orange",4,"France")))
    val fruitRDD: RDD[Fruit] = rdd.map(x=>Fruit(x._1,x._2,x._3))
    import ss.implicits._
    val fruitDF=fruitRDD.toDF("name","age","Address")
    fruitDF.show()
    //sql风格
    fruitDF.registerTempTable("fruitTable")

    val resutDF: DataFrame = sqlContext.sql("select * from fruitTable where age >2")
    resutDF.show()

    val resultDF2: DataFrame = ss.sql("select * from fruitTable where age >=2")
    resultDF2.show()


    sc.stop()
  }
}

###################################################
mapreduce  典型的离线处理计算引擎
storm     流失处理计算引擎
spark    离线处理+流式处理;基于离线处理的执行引擎来设计的;把流式处理看作是离线处理的特例
flink   流式处理+批处理;基于流式处理的执行引擎来进行设计的; 把离线处理看作是流式处理的特例


#############################################
storm核心概念:
    topology   一个storm的应用程序
    spout   数据源
    bolt    数据处理组件
    tuple   一个消息一条记录
    StreamingGrouping   分组规则
    
##############################################
实时流式处理的大致架构:
tomcat → log4j → logFile(实时监听) → flume(实时收集)  → kafka(缓冲作用,上游收集和下游消费的速度调节) →计算引擎(实时计算/流式计算)  → redis/mysql/hbase

离线:
flume → hdfs → mapreduce/hive → hbase/mysql/hdfs/redis

流式:
flume → kafka → storm/sparkStreaming → redis/mysql/hbase


flume既能做到监控文件的变化(tail -F exec),也能由事件进行驱动收集(spooldir)

##############################################
connection 不能序列化
所以connection在driver端,而写入数据在executor端,

连接池:第一次访问的时候,需要建立连接。 但是之后的访问,均会复用之前创建的连接


##########################################
checkpoint
    1.checkpoint的数据类型:
        元数据
        RDD数据
    2.合适启用checkpoint
        有状态计算:updateStateByKey,window
        如果有需要对diver进行HA
    3.如何配置checkpoint
        streamingContext.checkpoint(hdfspath)
        def functionToCreateStreamingContext():StreamingContext
        StreamingContext.getOrCreate(chkDir,functionToCreateStreamingContext)

        
###############################################
对数据的消费有三种语义:
at most once  最多一次,有可能会漏消费
at least once   最少一次,有可能重复消费
exactly once    有且仅有一次,效率低


#################################################
每一个stage中的task数量,都是有这个stage中最后一个RDD的分区数巨鼎



#####################################################
Resilient Distributed Dataset
弹性(可在内存/磁盘,分区可变)分布式数据集


transformation/转换
    延迟计算,rdd → rdd
action/行动
    触发sparkcontext提交job作业,rdd → 输出结果

宽依赖:指的是多个子RDD的Partition会依赖同一个父RDD的Partition
窄依赖:指的是每一个父RDD的Partition最多被子RDD的一个Partition使用

#############################################
在基于standalone的Spark集群,Cluster Manger就是Master。Master负责分配资源,在集群启动时,Driver向Master申请资源,Worker负责监控自己节点的内存和CPU等状况,并向Master汇报

每个worker可以起一个或多个Executor
每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task

一个application通过action划分不同job,在job中最后一个算子往前推按宽依赖划分不同stage

通过DAGScheduler划分阶段,形成一系列的TaskSet,然后传给TaskScheduler,把具体的Task交给Worker节点上的Executor的线程池处理。线程池中的线程工作,通过BlockManager来读写数据。
#############################################

原文地址:https://www.cnblogs.com/ShyPeanut/p/11065815.html