大数据:计算管理

  • 背景:

    • 2017 年,阿里内部 MaxCompute 集群上游 200 多万个任务,每天存储资源、计算资源消耗都很大。如何降低计算资源的消耗,提高任务执行的性能,提升任务产出的时间,是计算平台和 ETL 开发工程师孜孜追求的目标。

一、系统优化

  • 系统优化:通过分析计算系统的数据运行情况,判断计算系统对内存、CPU、Instance 个数这些资源的运用是否合理;
    • 合理:指计算时间更短,使用的资源更少;
  • Hadoop 等分布式计算系统的资源评估:
    • 评估方式:根据 Map 任务的输入的数据量进行静态评估;
      • 评估产出:是否存在长尾任务;
        • 长尾:多个 Instance 同时计算数据,但由于各个 Instance 上的数据量分布不均匀等情况,使得有的 Instance 先计算完,而有的要等执行很久,计算完成时间相差很大;
    •  弊端 / 问题:
      • 对于普通的 Map 任务,评估一般符合预期;而对于 Reduce 任务,其输入来自于 Map 的输出,但进行评估是也是根据 Map 任务的输出进行,这样评估的结果经常和实际需要的资源数相差很大;
    • 解决方案:
      • 在任务稳定的情况下,基于任务的历史执行情况进行评估,即采用 HBO(History - Based Optimizer,基于历史的优化器);
  • CBO(Cost - Based Optimizer,基于代价的优化器):
    • MaxCompute 的 CBO:

      • 在 Oracle 的 CBO 的基础上,改变了收集统计信息的方式:
        • 采用各种抽样统计算法,通过较少的资源获得大量的统计信息,基于先进的优化模型,具备了完善的 CBO 能力;
      • 优点:与传统的大数据计算系统相比,性能提升明显
    • Oracle 的 CBO:

      • 评估方式:根据收集到的表、分区、索引等统计信息,计算每种执行方式的代价(Cost),进而选择其中最优的(也就是代价最小的)执行方式
      • 优点:收集到的统计信息越多、越准确,Oracle 的 CBO 可能生出代价更小的执行计划;
      • 弊端 / 问题:
        1. 对表和列上统计信息的收集需要付出代价,尤其是在大数据环境下,表的体量巨大,需要消耗大量的资源来收集统计信息;
        2. 消耗大量资源收集到的统计信息,其利用率却很低;

1、HBO

  • HBO(History - Based Optimizer,基于历史的优化器),根据任务的历史执行情况,为任务分配更合理的资源;
    • 分配的资源内存、CPU、Instance 个数
      • Instance :指操作系统中一系列的进程以及为这些进程所分配的内存块;
  • HBO 是对集群资源分配的一种优化,概括起来就是:任务执行历史 + 集群状态信息 + 优化原则 → 更优的执行配置;

 1/1)背景

  1. MaxCompute 原资源分配策略

    1. MaxCompute 最初分配 MR 执行过程的 Instance个数的算法
      • MR执行过程:Map 任务、Reduce 任务;
    2. 在 Instance 分配算法的基础上,根据历史数据统计各个 Instance 处理的数据量
      1. Map Instance
        • fivenum ( round ( rt$map_input_bytes / 1024 / 1024, 2) )
          [1] 0.00  4.11  16.59  60.66  4921.94
      2. Reduce Instance
        • fivenum ( round ( rt$map_input_bytes / 1024 / 1024, 2) )
          [1]  0.00  0.00  0.75  24.87  192721.83
      3. Join Instance
        • fivenum ( round ( rt$map_input_bytes / 1024 / 1024, 2) )
          [1]  0.00  0.02  1.82  22.15  101640.31
      • 从上面内容可以看出:
        1. 大部分的 Instance 处理的数据量远远没有达到预期,即一个 Instance 处理 256 MB 的数据;
        2. 有些 Instance 处理的数据量很大,很容易导致任务长尾;
      • 总结:默认的 Instance 算法下,小任务存在资源浪费,而大任务却资源不足;(需要有更合理的资源分配方法,HBO 应运而生)
  2. HBO 的提出

    • 问题分析及解决思路:通过数据分析,发现在系统中存在大量的周期性调度的脚本(物理计划稳定),且这些脚本的输入一般比较稳定,如果能对这部分脚本进行优化,那么对整个集群的计算资源的使用率会得到显著提升;
    • 使用 HBO,根据任务的执行历史情况,为其分配更合理的计算资源;
      • HBO 一般通过自适应调整系统参数,来达到控制计算资源的目的;

 1/2)HBO 原理

  • HBO 分配资源的步骤:
    1. 前提:最近 7 天内,任务代码没有发生变更,且任务运行 4 次;
    2. Instance 分配逻辑:基础资源估算值 + 加权资源估算值;
      • 最终的 Instance 个数为:基础资源估算值 + 加权资源估算值;
      • 加权资源:指对已经按分配逻辑进行分配好基础资源的每个 Map Task / Reduce Task,再追加一部分资源;
  1. 基础资源数量的逻辑

    1. 对于 Map Task:Map 数量平均每个 Map 能处理的数据量
      1. 估算用户提交的任务所需要的 Map 数量;
        • 根据期望的每个 Map 能处理的数据量,再结合用户提交任务的输入数据量,估算出用户提交的任务所需要的 Map 数量;
      2. 采用分层的方式,提供平均每个 Map 能处理的数据量;
        • 分层的目的:为了保证集群上任务的整体吞吐量,保证集群的资源不会被一些超大任务占有;
    2. 对于 Reduce Task:Reduce 数量平均每个 Reduce 能处理的数据量
      • 计算 Reduce 数量的方法
        • Hive 的计算方法:使用 Map 的输入数据量计算 Reduce 的数量;
        • MaxCompute 的计算方法:使用最近 7 天 Reduce 对应的 Map 的平均输出数据量,作为 Reduce 的输入数据量,用于计算 Reduce 的数量;
      1. 估算用户提交的任务所需要的 Reduce 数量
        • 根据期望的每个 Reduce 能处理的数据量,再结合用户提交任务的 Reduce 的输入数据量,估算出用户提交的任务所需要的 Reduce 数量;
      2. 采用分层的方式,提供平均每个 Reduce 能处理的数据量;
        • 分层的目的:为了保证集群上任务的整体吞吐量,保证集群的资源不会被一些超大任务占有;
  2. 加权资源数量的逻辑

    • 加权资源:对每个 Map Task 和 Reduce Task,在其基础资源的基础上再追加一部分资源;
    • 以下是追加原理:追加多少、怎么追加
    1. 对于 Map Task
      1. 系统先初始化期望的每个 Map 能处理的数据量;
      2. 拿该 Map 在最近一段时间内的平均处理速度,与系统设定的期望值做比较:
        • 如果平均处理速度小于期望值,则按照同等比例对基础资源数量进行加权,估算出该 Map 的加权资源数量;
        • 一段时间:一般用最近 7 天;
    2. 对于 Reduce Task
      • 方法同 Map Task;
  3. CPU / 内存分配逻辑

    • 类似于 Instance 分配逻辑:基础资源估算值 + 加权资源估算值;

 1/3)HBO 效果

  1. 提高 CPU 利用率
  2. 提高内存利用率
  3. 提高 Instance 并发数
  4. 降低执行时长

 1/4)HBO 改进与优化

  • 特殊任务场景:有些业务在特定场合下依旧有数据量暴涨的情况发生,如,“双 11” 和 “双 12” 期间,这个日常生成的 HBO 计划就不适用了;
  • 解决方法HBO 增加了根据数据量动态调整 Instance 数的功能,主要根据 Map 的数量增长情况进行调整

2、CBO

  • MaxCompute 引入的基于代价的优化器(CBO):根据收集的统计信息,计算每种执行方式的代价,进而选择最优的执行方式;

 2/1)优化器原理

  • 优化器(Optimizer)引入了 Volcano 模型,该模型是基于代价的优化器(CBO),并且引入了重新排序 Join(Join Reorder)自动 MapJoin(Auto MapJoin)优化规则等,同时基于 Volcano 模型的优化器会尽最大的搜索宽度来获取最优计划;
  • 优化器功能结构模块:
    • Meta Manager(元数据)、Statistics(统计信息)、Rule Set(优化规则集)、Volcano Planner Core(核心计划器)等,如下图:
  1. Meta Manager

    • 功能:提供元数据信息
      • 元数据信息:包括表的元数据、统计信息元数据等;以及一些基本的元数据,如是否是分区表、表有哪些列等;
      • 当优化器在选择计划时,需要根据元数据的一些信息进行优化;
        • 如,表分区裁剪(TableScan Partition Prunning)优化时,需要通过 Meta 信息获取表数据有哪些分区,然后根据过滤条件来裁剪分区。
  2. Statistics

    • 功能:提供准确的统计信息
      • 统计信息:如,表的 Count 值、列的 Distinct 值、TopN 值等;
      • 收集统计信息:优化器提供了 UDF 来收集统计信息;(包括 Distinct 值、TopN 值等)
        • 注:Count 值等信息是由 Meta 直接提供的;
      • 优化器只有拥有准确的统计信息,才能计算出真正的最优的计划;
        • 如,Join 是选择 Hash Join 还是 Merge Join,优化器会根据 Join 的输入数据量(即 Count 值)来进行选择;
  3. Rule Set

    • 功能优化规则
      • 选择:根据不同情况选择不同的优化点,再由优化器根据代价模型(Cost Model)来选择启用哪些优化规则;
        • 如,工程合并规则(Project Merge Rule):将临近的两个 Project 合并成一个 Project;
        • 如,过滤条件下推规则(Filter Push Down):将过滤条件尽量下推,使得数据先进行过滤,再进行其他计算;(以较少其他操作的数据量)
    • 优化规则分类
      • Substitute:被认为是优化了肯定好的规则;
      • Explore Rule:优化后需要考虑多种优化结果;
      • Build Rule:可以认为优化后的结果不能再次使用规则再进行优化;
    • 所有的优化规则,都放在优化规则集中;
    • MaxCompute 优化器中的优化规则,由用户通过 set 等命令控制使用;
  4. Volcano Planner Core

    • 功能:将所有信息(Meta 信息、统计信息、优化规则)统一起来处理,然后根据代价模型的计算,获得一个最优计划;
    1. 代价模型
      • 功能 / 原理:代价模型根据不同操作符(如,Join、Project 等)计算出不同的代价,然后再计算出整个计划中最小代价的计划
      • MaxCompute 的代价模型提供的 Cost 由 3 个维度组成:行数I / O 开销CPU 开销;(3 个衡量标准)
        • 通过这 3 个维度衡量每个一操作符的代价;
    2. 工作原理:
      • 将需要输入给 Planner 的数据,用 Compiler 解析为一个 “计划树”,简称 “RelNode 树”,树的每个节点简称 RelNode
    3. Volcano Planner 创建
      • Planner 的创建:主要是将 Planner 在优化过程中所用到的信息传递给执行计划器;如规则集,用户指定要使用的规则
        • 信息:RelNode 的 Meta 计算值、RelNode 的代价计算值;
        • 信息的由来:
          • Meta Provider:每个 RelNode 的 Meta 计算;
            • 如,RowCount 值计算、Distinct 值计算等;
          • 代价模型:计算每个 RelNode 的代价等;
    4. Planner 优化

      • Planner 的优化过程:

      1. 规则匹配(Rule Match)

        • 规则匹配:指 RelNode 满足规则的优化条件而建立的一种匹配关系;(就是给所有的 RelNode,在规则集中找相匹配的规则
        • 操作步骤:
          • Planner 首先将整个 RelNode 树的每一个 RelNode 注册到 Planner 内部;同时在注册过程中,在规则集中找到与每个 RelNode 匹配的规则,然后加入到规则应用(Rule Apply)的队列中;
            • 整个注册过程处理结束后,所有与 RelNode 可以匹配的规则,全部加入到队列中,以后应用时只要从队列中取出来就可以了;
      2. 规则应用(Rule Apply)

        • 主要任务:优化每个规则队列中的规则
        • 优化过程:
          1. 从规则队列(Rule Queue)中弹出(Pop)一个已经匹配成功的规则进行优化
            • 如果优化成功后,会产生至少一个新的 RelNode;新的 RelNode 与未优化时的 RelNode 存在差异;
            • 弹出:弹出一个规则后,规则队列中就少一个规则;
          2. 使用新的 RelNode 再次进行注册以及规则匹配操作,再把匹配的规则加入到规则应用的规则队列中,然后接着下次规则应用;
          3. 结束对规则的优化:
            • Planner 会一直应用所有的规则,包括后来叠加的规则,直到不会有新的规则匹配到,则优化结束,得到一个最优计划
            • 产出:新的 “RelNode 树”,也就是新的 RelNode 数据节点集合;
      3. 代价计算(Cost Compute)

        • 代价计算的时期:
          • 每当规则应用之后,如果规则优化成功,则会产生新的 RelNode,在新的 RelNode 注册过程中,有一个步骤是计算 RelNode 的代价;
        • 代价计算的过程:
          1. 由代价模型对每个 RelNode 的代价进行估算和累加:
            1. 如果不存在代价,或者 Child 的代价还没有估算(默认是最大值),则忽略;
            2. 如果存在代价,则会将本身的代价和 Child (即输入的所有 RelNode)的代价进行累加;
          2. 若累加结果小于 Best(期望值),则认为优化后的 RelNode 是当前最优的;并且会对其 Parent 进行递归估算代价,即传播代价计算(Propagate Calculate Cost);
            • Parent :指与每个 RelNode 对应的没有被解析前的数据;
            • 思考:

              1. 代价评估的过程和规则优化的过程是同步的,如果已经找到满足的方案(累积代价小于 Best),但是规则优化还未结束,是否要继续优化规则?
              2. 如果还继续优化规则,是不是可以找到多种满足期望的方案?
              3. 是不是要对比所有满足期望的方案的累积代价,选择最小的代价对应的方案,作为最优的方案?

 2/2)优化器新特性(或者说是新功能)

  1. 重新排序 Join(Join Reorder)

    • Join 是关系型数据库中最重要的操作符之一,Join 的性能也直接关系到整个 SQL 的性能;
    • Join 排序算法的两种实现:MapJoin、Merge Join;
      • 对于小数据量,MapJoin 比 Merge Join 性能更优;
    • 功能将 Join 的所有不同输入进行一个全排序,找到代价最小的一个排列
      • 业务背景:排序之前只是保持了用户书写 SQL 语句的 Join 顺序,这样的 Join 顺序不一定是最优的,所以通过重排序 Join 规则可以实现最好的选择,提供更优的性能;
  2. 自动 MapJoin(Auto MapJoin)

    • 功能:将 Join 的所有不同输入进行一个全排序,找到代价最小的一个排列
    • 实现方式:充分利用优化器代价模型进行估算,获得更优的 MapJoin 方式,而不是通过 Hint 方式来处理;
    • 业务背景:之前通过 Hint 方式来指定是否使用 MapJoin,这样对用户不是很友好,且使用不方便;

 2/3)优化器使用

  • MaxCompute 优化器提供了许多优化规则,将内部已经实现的优化规则进行分类,并提供 set 等命令方便用户使用;
    • 一些基础优化规则都会默认打开,用户可以自己任意搭配使用;
  • 优化器提供的 Flag 有:

    • 规则白名单 —— odps.optimizer.cbo.rule.filter.white
    • 规则黑名单 —— odps.optimizer.cbo.rule.filter.black
  • 使用规则:

    • 使用优化规则:将需要使用的优化规则的缩写名称加入白名单即可;
      • 例 2:set odps.optimizer.cbo.rule.filter.white = pojr, hj ;
        • 表示:使用重排序 Join 规则和自动 MapJoin 规则;
        • 重排序规则 Join = pojr 、自动 MapJoin = hj ;
    • 关闭优化规则:将想要关闭的优化规则的缩写名称加入黑名单即可;
      • 例 1:set odps.optimizer.cbo.rule.filter.black = xxx, yyy ;
        • 表示:将优化规则 xxx 和 yyy 关闭;

 2/4)注意事项

  • 背景:

    • 由于用户书写 SQL 语句时可能存在一些不确定因素,所有应尽量避免这些因素带来的性能影响,甚至结果非预期;
  • 例:Optimizer 会提供谓词下推(Predicate Push Down)优化,主要目的是尽量早的进行谓词过滤,以减少后续操作的数据量,提供性能;但需要注意的是:
    1. UDF

      • 优化器不会任意下推带有用户意图的函数
        • 原因:不同用户书写的函数含义不一样,不可以一概而论;
        • 解决方法:如果用户需要下推 UDF,需要自己改动 SQL;
          • 好处:用户自己控制 UDF 执行的逻辑,最了解自己的 UDF 使用在 SQL 的哪个部分,而不是优化器任意下推;
    2. 不确定函数

      • 优化器不会任意下推不确定函数;(如,sample 函数)
        • 例 1:如果用户将 sample 函数写在 where 子句中,同时语句中存在 Join,则优化器是不会下推到 TableScan 的,因为可能改变愿意;
          • SELECT *
            FROM t1
            JOIN t2
            ON t1.c1 = t2.d1
            WHERE sample( 4, 1) = true ;
            • sample 函数在 Join 之后执行,而不会直接在 TableScan 后执行;
        • 例 2:如果用户需要对 TableScan 进行抽样,则需要自己修改 SQL来达到目的,否则优化器进行下推可能会错误的理解用户意图
          • SELECT *
            FROM
            (
                SELECT *
                FROM t1
                WHERE sample(4, 1) = true
            ) t1
            JOIN t2
            ON t1.c1 = t2.d1 ;
    3. 隐式类型转换

      • 书写 SQL 语句时,应尽量避免 Join Key 存在隐式类型转换
        • 如,String = Bigint,会转换为 ToDouble(String) = toDouble(Bigint),这是不是用户的原版意图,数据库本身不得而知;
      • 存在隐式类型转换可能会引发两种后果:
        1. 转换失败,报错;
        2. 虽然转换成功了,但结果与用户期望的不一致;

二、任务优化

  • SQL / MR 从提交到最后执行,在 MaxCompute 中的细化步骤:
    1. SQL / MR 作业一般会生成 MapReduce 任务,在 MaxCompute 中则会生成 MaxCompute Instance,通过唯一 ID 进行标识
    2. Fuxi Job:对于 MaxCompute Instance,会生成一个或多个由 Fuxi Task 组成的有向无环图,即 Fuxi Job;
      • Fuxi Task:也就是 Map 端的一个熟人分片数据;
      • MaxCompute Instance 和Fuxi Job 类似于 Hive 中 Job 的概念;
    3. Fuxi Task(任务类型):主要包含三种类型,分别是 Map、Reduce、Join,类似于 Hive 中 Task 的概念;
    4. Fuxi Instance:真正的计算单元,和 Hive 中的概念类似,一般和槽位(slot)对应;

1、Map 倾斜

  • Map 倾斜:数据在 Map Instance 上的分布不均匀,即有的 Map Instance 上分布的数据量很大,有的 Map Instance 分布的数据量很少;
    • 后果(Map 端长尾现象):有的 Map Instance 的资源浪费,有的 Map Instance 的资源不够,计算所用时间很长,导致最终整个 Map 端的计算时间变长;

 1/1)背景

  • Map 端是 MR 任务的起始阶段,Map 端的主要功能是从磁盘中将数据读入内存,Map 端的两个主要过程,如下图:
      1. 输入分片

        • 每个输入分片会让一个 Map Instance 来处理;
        • 默认情况下,一个 Pangu 文件系统的一个文件快的大小(默认为 256 MB)为一个分片(Fuxi Task);
      2. Map 读数据阶段

        • 调节 Map Instance 的个数:如,set  odps.mapper.split.size = 256;
        • 控制每个 Map Instance 读取文件的个数:如,set odps.mapper.merge.limit.size = 64;
          • 如果输入数据的文件大小差异比较大,每个 Map Instance 读取的数据量和读取时间差异也会很大;(长尾现象 / Map 倾斜)
      3. Map Instance 输出结果时

        • 输出结果会暂时放在一个环形内存缓冲区;(当该缓冲区快要溢出时会在本地文件系统中创建一个溢出文件,即 Write Dump
      4. 写入磁盘

        • 在写入磁盘之前,线程首先根据 Reduce Instance 的个数划分分区,数据将会根据 Key 值 Hash 到不同的分区上,一个 Reduce Instance 对应一个分区的数据;
          • Reduce Instance 的个数的确定,在下面的 Reduce 倾斜中介绍;
        • Map 端也会做部分聚合操作,以减少输入 Reduce 端的数据量;
      • 问题:

        1. 由于各个 Map Instance 的数据是根据 Hash 分配的,因此也会导致有些 Reduce Instance 分配到大量的数据,而有些 Reduce Instance 却分配到很少数据,甚至没有分配到数据;(也就是 Map 端数据倾斜,会连带影响 Reduce 端数据倾斜;)
        2. 在 Map 端读数据时,由于读入数据的文件大小分布不均匀,因此会导致有些 Map Instance 读取并且处理的数据特别多,而有些 Map Instance 处理的数据特别少,造成 Map 端长尾;一般有两种情况可能会导致 Map 端长尾:
          1. 上游表文件的大小特别不均匀,并且小文件特别多,导致当前表 Map 端读取的数据分布不均匀,引起长尾
            • 上游表文件:数据仓库中的维表和事实表;
          2. Map 端做聚合时,由于某些 Map Instance 读取文件的某个值特别多而引起长尾,主要指 Count Distinct 操作

 1/2)方案

  1. 针对第一种情况导致的 Map 端长尾

    • 情况一:上游表文件的大小特别不均匀,并且小文件特别多,导致当前表 Map 端读取的数据分布不均匀,引起长尾
    • 优化方案合并上游的小文件同时调节本节点的小文件的参数
      • 两个参数:
        1. 一种参数,用于调节 Map 任务的 Map Instance 的个数;
          • 例:set  odps.sql.mapper.merge.limit.size = 64 ;
        2. 另一种参数,用于调节单个 Map Instance 读取的小文件个数;
          • 例:set  odps.sql.mapper.split.size = 256 ;
  2. 针对第二种情况导致 Map 端长尾

    • 情况二:Map 端做聚合时,由于某些 Map Instance 读取文件的某个值特别多而引起长尾,主要指 Count Distinct 操作
      • 聚合:指数据将要从 Map 端输出,然后徐进入 Reduce 端的时候,对数据在 Map 端做的操作;
      • 聚合目的:减少 Reduce 端的数据量;
    • 实例说明由于某个值特别多而引起长尾:

      • 例:获取收集 APP 日志明细中的前一个页面的页面组信息;
        • pre_page:前一个页面的页面标志;
        • page_ut 表:存储的手机 APP 的页面组;
        • pre_page 只能通过匹配正则或者所属的页面组信息,进行笛卡尔积 Join;
        • 思路:pre_page 只能通过正则或者所属的页面组信息,进行笛卡尔积 Join;
        • 原始代码:
          • SELECT ...
            FROM
            (
                SELECT ds
                              , unique_id
                              , pre_page
                FROM tmp _app_ut_1
                WHERE ds = '${bizdate}'
                AND pre_page is not null
            ) a
            LEFT OUTER JOIN
            (
                SELECT t.*
                             , length(t.page_type_rule) rule_length
                FROM    page_ut t
                WHERE ds = '${bizdate}'
                AND      is_enable = 'Y'
            ) b
            ON 1 = 1
            WHERE a.pre_page rlike b.page_type_rule ;
        • 运行代码,LogView 日志如下图:
          1. L1_Stg4:MapJoin 小表的分发阶段;
          2. M3_Stg1:读取明细日志表的 Map 阶段;与 MapJoin 小表的 Join 操作也发生在这个阶段;
          3. R5_3_Stg2:进行分组排序的阶段;
        • 问题分析:

          • M3_Stg1 阶段,单个 Instance 的处理时间达到了 40 分钟,而且长尾的 Instance 个数比较固定,应是不同的 Map 读入的文件块分别不均匀导致的,文件块大的 Map 数据量比较大,在与小表进行笛卡尔积 操作时,非常耗时,造成 Map 端长尾
        • 解决方法:

          • 使用 “distribute by rand()” 来打乱数据分布,使数据尽可能分布均匀
        • 修改后的代码如下
          • SELECT ...
            FROM
            (
                SELECT ds
                              , unique_id
                              , pre_page
                FROM tmp _app_ut_1
                WHERE ds = '${bizdate}'
                AND pre_page is not null
                DISTRIBUTE BY rand()
            ) a
            LEFT OUTER JOIN
            (
                SELECT t.*
                             , length(t.page_type_rule) rule_length
                FROM    page_ut t
                WHERE ds = '${bizdate}'
                AND      is_enable = 'Y'
            ) b
            ON 1 = 1
            WHERE a.pre_page rlike b.page_type_rule ;
        • 执行上述代码,LogView 日志如下图:
        • 原因分析:

          • 通过 “distribute by rand()” ,将 Map 端分发后的数据重新按照随机值再进行一次分发;
            1. 不加人随机分配函数时:Map 阶段需要与使用 MapJoin 的小表进行笛卡尔积操作,Map 端完成了大小表的分发和笛卡尔积操作;
            2. 加入随机分配函数后:Map 端只负责数据的分发,不再有复杂的聚合或笛卡尔积操作,因此不会导致 Map 端长尾;

 1/3)思考

  • Map 端长尾的根本原因由于读入的文件块的数据分布不均匀,再加上 UDF 函数性能、Join、聚合操作等,导致读入数据量大的 Map Instance 耗时较长
  • 实际开发过程中,如果遇到 Map 端长尾情况,解决思路:

    1. 首先,考虑如何让 Map Instance 读取的数据量足够均匀;
    2. 然后,判断是哪些操作导致 Map Instance 比较慢;
    3. 最后,考虑这些操作是否必须在 Map 端完成,在其他阶段是否会做得更好;

2、Join 倾斜

  • Join 的功能:MaxCompute SQL 在 Join 执行阶段会将 Join Key 相同的数据分发到同一个执行 Instance 上处理

 2/1)背景

  1. Join 的执行原理

    • Join 操作需要参与 Map 和 Reduce 的整个阶段;
    • 例:通过 Join 的SQL 代码,来看整个 Map 和 Reduce 阶段的执行过程以及数据变化,进而对 Join 执行原理有所了解;
      • SELECT student_id, student_name, course_id
        FROM    student
        LEFT JOIN student_course
        ON student.student_id = student_course.student_id ;
      • 过程理解:MaxCompute SQL 在 Join 执行阶段会将 Join Key 相同的数据分发到同一个执行 Instance 上处理;
        • 长尾情况:如果某个 Key 上的数据量比较大,则会导致该 Instance 执行时间较长;
        • 长尾表现:在执行日志中该 Join Task 的大部分 Instance 都已执行完成,但少数几个 Instance 一致处于执行中;
  2. MaxCompute SQL 执行中的 Join 阶段的 3 中数据倾斜场景

    1. Join 的某输入比较小
      • 可以采用 MapJoin,避免分发引起长尾;
    2. Join 的每路输入都较大,且长尾是空值导致的
      • 可以将空值处理成随机值,避免聚集;
    3. Join 的每路输入都较大,且长尾是热点值导致的
      • 可以对热点值和非热点值分别进行处理,再合并数据;
  3. 如何确认 Join 是否发生数据倾斜
    1. 打开 MaxCompute SQL 执行时产生的 LogView 日志,点开日志可以看到每个 Fuxi Task 的详细执行信息,如下图:
      • 可以看到每一个 Map、Join、Reduce 的Fuxi Task 任务;
    2. 根据上图,点击其中一个 Join 任务,可以看到有 115 个 Instance 长尾;再点击 StdOut,可以查看 Instance 读入的数据量,如下图:
        • 图 13.10 显示,Join 的一路输入读取的数据量是 1389941257 行;
        • 长尾情况:如果 Long-Tails 中 Instance 读入的数据量远超过其他 Instance 读取的数据量,则表示某个 Instance 处理的数据量超大导致长尾;

 2/2)方案

  • 针对上面的 3 中倾斜场景,给出 3 中对应的解决方案
  1. Join 的某输入比较小

    • 方案采用 MapJoin
    1. MapJoin的原理
      • MapJoin 的原理:将 Join 操作提前到 Map 端执行,将小表读入内存,顺序扫描大表完成 Join;
        • 优点:可以避免因为分发 Key 不均匀导致数据倾斜;
        • 弊端:MapJoin 的使用有限制,必须是 Join 中的从表比较小才可用;
          • 从表:左外连接中的右表,或者右外连接中的左表;
    2. MapJoin 的使用方法
      • 具体操作在代码中 select 后加上 “/*+mapjoin(a)*/” 即可
        • a:代表小表(或者子查询)的别名;
        • 例:MaxCompute 已经可以自动选择是否使用 MapJoin ,可以不使用显式 Hint:
          • SELECT    /*+MAPJOIN(b)*/
                            a.c2
                            ,b.c3
            
            FROM
            (
                    SELECT    C1
                             ,C2
                    FROM      t1
            ) a
            LETF OUTER JOIN
            (
                    SELECT    c1
                             ,c3
                    FROM      t2
            ) b
            on    a.c1 = b.c1;
    • 使用 MapJoin 时,对小表的大小有限制,默认小表读入内存后的大小不能超过 512 MB,但是用户可以通过设置 “set odps.sql.mapjoin.memory.max = 2048” 加大内存,最大为 2048 MB;
  2. Join 因为空值导致长尾

    • 方案将空值处理成随机值
    • 原因:空值无法关联上,只是分发到一处,因此处理成随机值既不会影响关联结果,也能很好的避免聚集导致长尾;
    • 例:
      • SELECT    ...
        FROM                        table_a
        LEFT OUTER JOIN      table_b
        ON    coalesce(table_a.key, rand()*9999) = table_b.key    --当 key 值为空值时用随机值代替
  3. Join 因为热点值导致长尾

    • 场景:因为热点值导致长尾,且 Join 的输入比较大,无法使用 MapJoin;
    • 方案先将热点 key 取出,用热点 key 将主表数据切分成热点数据和非热点数据两部分,分别处理,最后合并
    • 以实例阐述操作步骤:
      • 例:淘宝的 PV 日志关联商品维表,取商品属性为例;
        1. 取热点 key:将 PV 大于 50000 的商品 ID 取出到临时表中
          • INSERT    OVERWRITE TABLE topk_item
            SELECT    item_id
            FROM
            (
                            SELECT    item_id
                                            ,count(1) as cnt
                            FROM        pv        --pv表
                            WHERE    ds = '${bizdate}'
                            AND        url_type = 'ipv'
                            AND        item_id is not null
                            GROUP BY item_id
            ) a
            WHERE    cnt >= 50000
        2. 取出非热点数据
          • 操作步骤:将主表(pv 表)和热点 key 表(topk_item 表)外关联后,通过条件 “bl.item_id is null” 取关联不到的数据,即非热点商品的日志数据;此时需要使用 MapJoin;再用热点数据关联商品维表;(因为已经排除了热点数据,所以不会长尾;)
          • 代码示例:
            • SELECT ...
              FROM
              (
                      SELECT    *
                      FROM        item        --商品表
                      WHERE    ds = '${bizdate}'
              ) a
              RIGHT OUTER JOIN
              (
                      SELECT    /*+MAPJOIN*/
                                      b2.*
                      FROM
                      (
                                      SELECT    item_id
                                      FROM        topk_item        --热点表
                                      WHERE    ds = '${bizdate}'
                      ) b1
                      RIGHT OUTER JOIN
                      (
                                      SELECT    *
                                      FROM        pv            --PV表
                                      WHERE    ds = '${bizdate}'
                                      AND          url_type = 'ipv'
                      ) b2
                      ON b1.item_id = coalesce(b2.item_id, concat("tbcdm", rand())
                      WHERE    b1.item_id is null
              ) 1
              ON a.item_id = coalesce(1.item_id, concat("tbcdm", rand())
        3. 取出热点数据
          1. 取到热点商品的日志数据:将主表(pv 表)和热点 key 表(topk_item 表)内关联,此时需要使用 MapJoin;
          2. 取到热点商品的维表数据:同时,需要将商品维表(item 表)和热点 key 表(topk_item 表)内关联;
            • 因为维表数据只有热点商品的数据,数据量比较小,可以使用 MapJoin 避免长尾;
          3. 将上两步汇总的日志数据外关联维表数据;
            • SELECT    /*+MAPJOIN*/
                              ...
              FROM
              (
                              SELECT    /*+MAPJOIN*/
                                              b2.*
                              FROM
                              (
                                              SELECT    item_id
                                              FROM        topk_item
                                              WHERE     ds = '${bizdate}'
                              ) b1
                              JOIN
                              (
                                              SELECT    *
                                              FROM        pv        --pv 表
                                              WHERE    ds = '${bizdate}'
                                              AND          url_type = 'ipv'
                                              AND          item_id is not null
                              ) b2
                              ON            (b1.item_id = b2.item_id)
              ) 1
              LEFT OUTER JOIN
              (
                              SELECT    /*+MAPJOIN*/
                                              a2.*
                              FROM
                              (
                                              SELECT    item_id
                                              FROM       topk_item
                                              WHERE    ds = '${bizdate}'
                              ) a1
                              JOIN
                              (
                                              SELECT    *
                                              FROM        item        --商品表
                                              WHERE    ds = '${bizdate}'
                              ) a2
                              ON            (a1.item_id = a2.item_id)
              ) a
              ON            a.item_id = 1.item_id
        4. 将上面取到的非热点数据和热点数据通过 “union all” 合并后,即得到完整的日志数据,并且关联了商品信息;
  • 针对倾斜问题,MaxCompute 系统提供了专门的参数用来解决长尾问题:

    • 例 1:开启 / 关闭功能
      • set odps.sql.skewjoin = true / false
    • 例 2:设置倾斜的 key 及对应的值
      • set odps.sql.skewinfo = skewed_src: (skewed_key)    --设置倾斜的 key 值(skewed_ey)
    • 优点:简单方便;
    • 弊端:
      1. 如果倾斜值发生变化,需要修改代码,而且一般无法提前知道变化;
      2. 如果倾斜至比较多,则不方便在参数中设置;
    • 需要根据实际情况,选择拆分代码或者设置参数;

 2/3)思考

  • 当大表和大表 Join因为热点值发生倾斜时,虽然可以通过修改代码来解决,但是修改起来很麻烦,代码改动也很大,且影响阅读;而 MaxCompute 现有的参数设置使用不够灵活,倾斜值多的时候,不可能将所有值都列在参数中,且倾斜值可能经常变动;

3、Reduce 倾斜

 3/1)背景

  • Reduce 端对 Map 端梳理后的有序 key-value 键值对进行聚合,即 进行 Count、Sum、Avg 等聚合操作,得到最终聚合的结果
  • Distinct:MaxCompute SQL 中支持的语法;
    • 功能:用于对字段去重;
    • 原理:将需要去重的字段以及 Group By 字段联合作为 key 将数据分发到 Reduce 端;
  • Reduce 端长尾原因:key 的数据分布不均匀;

    • 因为 Distinct 操作,数据无法在 Map 端的 Shuffle 阶段根据 Group By 先做一个聚合操作,以减少传输的数据量,而是将所有的数据都传输到 Reduce 端,当 key 的数据分布不均匀时,就会导致 Reduce 端长尾;
  • 造成 Reduce 端长尾的 4 种情况:

    1. 对同一个表按照维度对不同的列进行 Count Distinct 操作,造成 Map 端数据膨胀,从而使下游的 Join 和 Reduce 出现链路上的长尾;
    2. Map 端直接做聚合时,出现 key 值分布不均匀,造成 Reduce 端长尾;
    3. 动态分区数过多时,可能造成小文件过多,从而引起 Reduce 端长尾;
    4. 多个 Distinct 同时出现在一端 SQL 代码中时,数据会被分发多次,不仅会造成数据膨胀 N 倍,还会把长尾现象放大 N 倍;

 3/2)方案

  • 针对上述的造成 Reduce 端长尾的 4 中情况,给出解决方案;
  1. Map 端直接做聚合时,出现 key 值分布不均匀,造成 Reduce 端长尾

    • 解决方案对热点 key 单独处理,然后通过 “Union All” 合并;(具体操作与 “Join 因为热点值导致长尾” 的处理方式一样)
  2. 动态分区数过多时,可能造成小文件过多,从而引起的 Reduce 端长尾

    • 解决方案:把符合不同条件的数据放到不同的分区,避免通过多次 “Insert Overwrite” 写入表中,特别是分区数比较多时,能够很多的简化代码;
      • 弊端:动态分区也可能带来小文件过多的困扰;
      • 例:
        • INSERT OVERWRITE TABLE part_test PARTITION(ds)
          SELECT    *
          FROM        part_test ;
        • 假设有 K 个 Map Instance,N 个目标分区,那么在最坏的情况下,可能产生K x N 个小文件,而过多的小文件会对文件系统造成巨大的管理压力;
        • 解决方法:
          • MaxCompute 对动态分区的处理是引入额外一级的 Reduce Task,把相同的目标分区交由同一个(或少量几个)Reduce Instance 来写入,避免小文件过多,并且这个 Reduce 肯定是最后一个 Reduce Task 操作;(MaxCompute 是默认开启这个功能的,也就是将下面参数设置为 true)
          • set odps.sql.reshuffle.dynamicpt = true;
  3. 多个 Distinct 同时出现在一段 SQL 代码中时,数据会被分发多次,不仅会造成数据膨胀 N 倍,还会把长尾现象方法 N 倍

    • 例:在 7 天、30天等时间范围内,分 PC 端、无线端、所有终端,计算支付买家和支付商品数,其中支付买家数和支付商品数指标需要去重;
      • 方案一使用 Distinct 去重

        • 因为需要根据日期、终端等多种条件组合对买家和商品进行去重,因此需要有 12 个 Count Distinct 计算;
        • 下图为统计代码和该代码运行的 LogView 日志,可以看出节点运行时间:1 h 14 min,数据膨胀;
      • 方案二:不使用 Distinct 去重
        1. 计算支付买家数:
          1. 先分别进行查询,执行 Group By 原表粒度 + buyer_id,计算出 PC 端、无线端、所有终端以及 7 天、30 天等统计口径下的 buyer_id(这里可以理解为买家支付的次数);
          2. 在子查询外,Group By 原表粒度:当上一步的 Count 值大于 0 时,说明这一买家在这个统一口径下油锅支付,计入支付买家数,否则不计入;
        2. 计算支付商品数:
          • 与计算支付买家数的方法一样,按照上两步操作进行;
        3. 对支付买家数和支付商品数进行 Join 操作;
          • 代码示例:(仅示例支付买家数计算)
            • SELECT    t2.seller_id
                              ,t2.price_seg_id
                              ,SUM(case when pay_ord_byr_cnt_1w_001 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1w_001    --最近 7 天支付买家数
                              ,SUM(case when pay_ord_byr_cnt_1w_002 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1w_002    --最近 7 天 PC 端支付买家数
                              ,SUM(case when pay_ord_byr_cnt_1w_003 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1w_003    --最近 7 天无线端支付买家数
                              ,SUM(case when pay_ord_byr_cnt_1m_002 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1m_002    --最近 30 天支付买家数
                              ,SUM(case when pay_ord_byr_cnt_1m_003 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1m_003    --最近 30 天 PC 端支付买家数
                              ,SUM(case when pay_ord_byr_cnt_1m_004 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1m_004    --最近 30 天无线端支付买家数
              FROM
              (
                      SELECT    a1.seller_id
                                      ,a2.price_seg_id
                                      ,buyer_id
                                      ,COUNT(buyer_id) AS pay_ord_byr_cnt_1m_002    --最近 30 天支付买家数
                                      ,COUNT(CASE WHEN is_wireless = 'N' THEN buyer_id ELSE NULL END) AS pay_ord_byr_cnt_1m_003    --最近 30 天 PC 端支付买家数
                                      ,COUNT(CASE WHEN is_wireless = 'Y' THEN buyer_id ELSE NULL END) AS pay_ord_byr_cnt_1m_004    --最近 30 天无线端支付买家数
                                      ,COUNT(case when a1.ds >= To_CHAR(DATEADD(TO_DATE('${bizdate}', 'yyyymmdd'), -6, 'dd'), 'yyyymmdd') then buyer_id else null end) AS pay_ord_byr_cnt_1w_001    --最近 7 天支付买家数
                                      ,COUNT(case when a1.ds >= To_CHAR(DATEADD(TO_DATE('${bizdate}', 'yyyymmdd'), -6, 'dd'), 'yyyymmdd') and is_wireless = 'N' THEN buyer_id ELSE NULL END) AS pay_ord_byr_cnt_1w_002    --最近 7 天 PC 端支付买家数
                                      ,COUNT(case when a1.ds >= To_CHAR(DATEADD(TO_DATE('${bizdate}', 'yyyymmdd'), -6, 'dd'), 'yyyymmdd') and is_wireless = 'Y' THEN buyer_id ELSE NULL END) AS pay_ord_byr_cnt_1w_003    --最近 7 天无线端支付买家数
              FROM
                          (
                              select *
                              from    table_pay        --支付表
                          ) a1
              JOIN ( SELECT item_id
                                      ,price_seg_id
                         FROM    tag_itm        --商品 tag 表
                         WHERE  ds = '${bizdate}'
                      ) a2
              ON ( a1.item_id = a2.item_id )
              GROUP BY a1.seller_id               --原表粒度
                              ,a2.price_seg_id        --原表粒度
                              ,buyer_id
              )  t2
              GROUP BY t2.seller_id               --原表粒度
                              ,t2.price_seg_id        --原表粒度
            • 修改后运行时间为 13 min,整体运行的 LogView 日志如下图:数据没有膨胀;

 3/3)思考

  1. 对 Multi Distinct 的思考:

    1. 上述方案中如果出现多个需要去重的指标,那么在把不同指标 Join 在一起之前,一定要确保指标的粒度是原始表的数据粒度;
      • 如,支付买家数和支付商品数,在子查询中指标粒度分别是:原始表的数据粒度 + buyer_id 和原始表的数据粒度 + item_id,这时两个指标不是同一数据粒度,所以不能 Join,需要再套一层代码,分别把指标 Group By 到 “原始表的数据粒度”,然后再进行 Join 操作;
    2. 在性能和代码简洁、可维护之间需要根据具体情况进行权衡
      • 情况 1
        • 修改前的 Multi Distinct 代码的可读性比较强,代码简洁,便于维护;修改后的代码较为复杂;
          • 特点:一般代码改动比较大,需要投入一定的时间成本;
          • 解决思路:可以考虑做成自动化,通过检测代码、优化代码自动生成;
      • 情况 2
        • 当出现的Distinct 个数不多、表的数据量也不是很大、表的数据分布较均匀时,可以不使用 Multi Distinct 进行计算;
    3. 考虑上述两种情况的另一种处理方式

      • 情况 1 及处理方式:当代码比较臃肿时,也可以将上述子查询落到中间表里,这样数据模型更合理、复用性更强、层次更清晰;
      • 情况 2 及处理方式:当需要去除类似的多个 Distinct 时,可以查一下是否有更细粒度的表可用,避免重复计算;
  2. 两个要注意的问题

    1. 目前 Reduce 端数据倾斜很多是由 Count Distinct 问题引起的,因此,在 ETL 开发工作中应该予以重视 Count Distinct 问题,避免数据膨胀;
    2. 对于一些表的 Join 阶段的 NULL 值问题,应该对表的数据分布要有清楚的认识,在开发时解决这个问题;
原文地址:https://www.cnblogs.com/volcao/p/13640434.html