spark2.1 自定义累加器的使用

spark2.1 自定义累加器的使用

  • 继承AccumulatorV2类,并复写它的所有方法

    package spark
    
    import constant.Constant
    import org.apache.spark.util.AccumulatorV2
    import util.getFieldFromConcatString
    import util.setFieldFromConcatString
    
    
    open class SessionAccmulator : AccumulatorV2<String, String>() {
    
    
    
    
        private var result = Constant.SESSION_COUNT + "=0|"+
                Constant.TIME_PERIOD_1s_3s + "=0|"+
                Constant.TIME_PERIOD_4s_6s + "=0|"+
                Constant.TIME_PERIOD_7s_9s + "=0|"+
                Constant.TIME_PERIOD_10s_30s + "=0|"+
                Constant.TIME_PERIOD_30s_60s + "=0|"+
                Constant.TIME_PERIOD_1m_3m + "=0|"+
                Constant.TIME_PERIOD_3m_10m + "=0|"+
                Constant.TIME_PERIOD_10m_30m + "=0|"+
                Constant.TIME_PERIOD_30m + "=0|"+
                Constant.STEP_PERIOD_1_3 + "=0|"+
                Constant.STEP_PERIOD_4_6 + "=0|"+
                Constant.STEP_PERIOD_7_9 + "=0|"+
                Constant.STEP_PERIOD_10_30 + "=0|"+
                Constant.STEP_PERIOD_30_60 + "=0|"+
                Constant.STEP_PERIOD_60 + "=0"
    
        override fun value(): String {
            return this.result
        }
    
        /**
         * 合并数据
         */
        override fun merge(other: AccumulatorV2<String, String>?) {
            if (other == null) return else {
                if (other is SessionAccmulator) {
                    var newResult = ""
                    val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s,
                            Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m,
                            Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m,
                            Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9,
                            Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60)
                    resultArray.forEach {
                        val oldValue = other.result.getFieldFromConcatString("|", it)
                        if (oldValue.isNotEmpty()) {
                            val newValue = oldValue.toInt() + 1
                            //找到原因,一直在循环赋予值,debug30分钟 很烦
                            if (newResult.isEmpty()){
                                newResult = result.setFieldFromConcatString("|", it, newValue.toString())
                            }
                            //问题就在于这里,自定义没有写错,合并错了
                            newResult = newResult.setFieldFromConcatString("|", it, newValue.toString())
                        }
                    }
                    result = newResult
                }
            }
        }
    
        override fun copy(): AccumulatorV2<String, String> {
            val sessionAccmulator = SessionAccmulator()
    
            sessionAccmulator.result = this.result
            return sessionAccmulator
        }
    
        override fun add(p0: String?) {
            val v1 = this.result
            val v2 = p0
            if (v2.isNullOrEmpty()){
                return
            }else{
                var newResult = ""
                val oldValue = v1.getFieldFromConcatString("|", v2!!)
                if (oldValue.isNotEmpty()){
                    val newValue = oldValue.toInt() + 1
                    newResult = result.setFieldFromConcatString("|", v2, newValue.toString())
    
                }
                result = newResult
            }
        }
    
        override fun reset() {
            val newResult = Constant.SESSION_COUNT + "=0|"+
                    Constant.TIME_PERIOD_1s_3s + "=0|"+
                    Constant.TIME_PERIOD_4s_6s + "=0|"+
                    Constant.TIME_PERIOD_7s_9s + "=0|"+
                    Constant.TIME_PERIOD_10s_30s + "=0|"+
                    Constant.TIME_PERIOD_30s_60s + "=0|"+
                    Constant.TIME_PERIOD_1m_3m + "=0|"+
                    Constant.TIME_PERIOD_3m_10m + "=0|"+
                    Constant.TIME_PERIOD_10m_30m + "=0|"+
                    Constant.TIME_PERIOD_30m + "=0|"+
                    Constant.STEP_PERIOD_1_3 + "=0|"+
                    Constant.STEP_PERIOD_4_6 + "=0|"+
                    Constant.STEP_PERIOD_7_9 + "=0|"+
                    Constant.STEP_PERIOD_10_30 + "=0|"+
                    Constant.STEP_PERIOD_30_60 + "=0|"+
                    Constant.STEP_PERIOD_60 + "=0"
            result = newResult
        }
    
        override fun isZero(): Boolean {
            val newResult = Constant.SESSION_COUNT + "=0|"+
                    Constant.TIME_PERIOD_1s_3s + "=0|"+
                    Constant.TIME_PERIOD_4s_6s + "=0|"+
                    Constant.TIME_PERIOD_7s_9s + "=0|"+
                    Constant.TIME_PERIOD_10s_30s + "=0|"+
                    Constant.TIME_PERIOD_30s_60s + "=0|"+
                    Constant.TIME_PERIOD_1m_3m + "=0|"+
                    Constant.TIME_PERIOD_3m_10m + "=0|"+
                    Constant.TIME_PERIOD_10m_30m + "=0|"+
                    Constant.TIME_PERIOD_30m + "=0|"+
                    Constant.STEP_PERIOD_1_3 + "=0|"+
                    Constant.STEP_PERIOD_4_6 + "=0|"+
                    Constant.STEP_PERIOD_7_9 + "=0|"+
                    Constant.STEP_PERIOD_10_30 + "=0|"+
                    Constant.STEP_PERIOD_30_60 + "=0|"+
                    Constant.STEP_PERIOD_60 + "=0"
            return this.result == newResult
        }
    }
    
    方法介绍

    value方法:获取累加器中的值

       merge方法:该方法特别重要,一定要写对,这个方法是各个task的累加器进行合并的方法(下面介绍执行流程中将要用到)

        iszero方法:判断是否为初始值

        reset方法:重置累加器中的值

        copy方法:拷贝累加器

spark中累加器的执行流程:

          首先有几个task,spark engine就调用copy方法拷贝几个累加器(不注册的),然后在各个task中进行累加(注意在此过程中,被最初注册的累加器的值是不变的),执行最后将调用merge方法和各个task的结果累计器进行合并(此时被注册的累加器是初始值)      

原文地址:https://www.cnblogs.com/zhangweilun/p/6684776.html