Spark 共享变量之——Accumulator(累加器)

一、简介

为了方便的统计和管理一些共同信息,Spark中定义了两种共享变量——Broadcast(广播变量)和 Accumulator(累加器),可以方便的把一些变量或数据共享给集群各个节点,今天来看看Accumulator。

Accumulator是由Driver端总体维护的,读取当前值也是在Driver端,各个Task在其所在的Executor上也维护了Accumulator变量,但只是局部性累加操作,运行完会到Driver端去合并累加结果。Accumulator有两个性质:

1、只可累加,合并即累加;

2、不改变Spark作业Lazy执行的特点,也即没有action操作触发job的情况下累加器的值有可能是初始值。

二、Accumulator的分类(Spark2.x):

1、Spark自带类型的累加器

     (1)LongAccumulator(long类型的累加器累加整型)

     (2)DoubleAccumulator(double类型的累加器累加浮点型)

    (3)CollectionAccumulator(集合类型累加器累加集合元素)

创建方式如下:

LongAccumulator longAccumulator = sc.sc().longAccumulator("longAccumulator");//其中longAccumulator为该累加器在web UI上的名称

2、自定义累加器 ——累加器类需继承AccumulatorV2抽象类

需要实现其中add()方法、Merge()方法、value()方法等必要和非必要方法;

以下我实现了一个字符串拼接的自定义累加器:

package com.renyang.sparkproject.spark.session;

import com.renyang.sparkproject.constant.Constants;
import com.renyang.sparkproject.util.StringUtils;
import org.apache.spark.util.AccumulatorV2;

public class SessionAggrStatAccumulatorV2 extends AccumulatorV2<String, String> {
    private static final long serialVersionUID = 6311074555136039130L;

    private String data = "session_count=0|1s_3s=0|4s_6s=3|7s_9s=0|10s_30s=0|30s_60s=0|1m_3m=0|3m_10m=0|10m_30m=0|30m=0|1_3=0|4_6=1|7_9=0|10_30=0|30_60=0|60=0";

    private String zero = data;

    @Override
    public boolean isZero() {
        return data.equals(zero);
    }

    @Override
    public AccumulatorV2<String, String> copy() {
        return new SessionAggrStatAccumulatorV2();
    }

    @Override
    public void reset() {
        data = zero;
    }

    public void add(String v) {
        data = add(data, v);
    }

    @Override
    public void merge(AccumulatorV2<String, String> other) {
        SessionAggrStatAccumulatorV2 o =(SessionAggrStatAccumulatorV2)other;
        String[] words = data.split("\|");
        String[] owords = o.data.split("\|");
        for (int i = 0; i < words.length; i++) {
            for (int j = 0; j < owords.length; j++) {
                if (words[i].split("=")[0].equals(owords[j].split("=")[0])){
                    int value = Integer.valueOf(words[i].split("=")[1]) +Integer.valueOf(owords[j].split("=")[1]);
                    String ns = StringUtils.setFieldInConcatString(data, "\|", owords[j].split("=")[0], String.valueOf(value));
                    //每次合并完,更新str
                    data = ns;
                }
            }
        }
    }

    @Override
    public String value() {
        return data;
    }

    /**
     * session统计计算逻辑
     * @param v1 连接串
     * @param v2 范围区间
     * @return 更新以后的连接串
     */
    private String add(String v1, String v2) {
        // 校验:v1为空的话,直接返回v2
        if(StringUtils.isEmpty(v1)) {
            return v2;
        }

        // 使用StringUtils工具类,从v1中,提取v2对应的值,并累加1
        String oldValue = StringUtils.getFieldFromConcatString(v1, "\|", v2);
        if(oldValue != null) {
            // 将范围区间原有的值,累加1
            int newValue = Integer.valueOf(oldValue) + 1;
            // 使用StringUtils工具类,将v1中,v2对应的值,设置成新的累加后的值
            return StringUtils.setFieldInConcatString(v1, "\|", v2, String.valueOf(newValue));
        }

        return v1;
    }
}

 三、Accumulator的运行逻辑

1、Driver端负责定义和注册累加器

累加器在Driver端被定义并初始化,同时需要注册入SparkContext,这样才能将累加器变量分发到集群各个节点,等到各个Task运行完之后会回收累加器结果进行Driver端合并,这个合并的过程是根据Task执行情况而定,只要有完成的Task就会更新累加器变量。

2、Executor端

Executor接收到Task之后,不但会反序列化RDD和Function,还会反序列化Accumulator,当Executor执行完Task之后,会将结果随同Accumulator一起返回给Driver端。

原文地址:https://www.cnblogs.com/renyang/p/12606725.html