每日一题 为了工作 2020 0511 第六十九题

package com.swust.skynet;

import com.swust.constant.Constants;
import com.swust.utils.StringUtils;
import org.apache.spark.util.AccumulatorV2;

/**
 * In String
 * Out String
 * @author 雪瞳
 * @Slogan 时钟尚且前行,人怎能就此止步!
 * @Function 自定义累加器
 *
 */
public class SelfDefineAccumulator extends AccumulatorV2<String,String>{

    String returnResult = "";

    @Override
    public boolean isZero() {
        return "normalMonitorCount=0|normalCameraCount=0|abnormalMonitorCount=0|abnormalCameraCount=0|abnormalMonitorCameraInfos= ".equals(returnResult);
    }

    @Override
    public AccumulatorV2<String, String> copy() {
        SelfDefineAccumulator accumulator = new SelfDefineAccumulator();
        accumulator.returnResult = this.returnResult;
        return accumulator;
    }

    /**
     * 每个分区的初始值
     */
    @Override
    public void reset() {

        this.returnResult = Constants.FIELD_NORMAL_MONITOR_COUNT +"=0|"
                            +Constants.FIELD_NORMAL_CAMERA_COUNT +"=0|"
                            +Constants.FIELD_ABNORMAL_MONITOR_COUNT +"=0|"
                            +Constants.FIELD_ABNORMAL_CAMERA_COUNT +"=0|"
                            +Constants.FIELD_ABNORMAL_MONITOR_CAMERA_INFOS+"= ";
    }

    /**
     * 每个分区会拿着 reset 初始化的值 ,在各自的分区内相加
     * @param v
     */
    @Override
    public void add(String v) {
        returnResult = myAdd(returnResult,v);
    }

    /**
     * 每个分区最终的结果和初始值 returnResult=""  做累加
     * @param other
     */
    @Override
    public void merge(AccumulatorV2<String, String> other) {
        SelfDefineAccumulator accumulator = (SelfDefineAccumulator) other;
        returnResult = myAdd(returnResult,accumulator.returnResult);
    }

    @Override
    public String value() {
        return this.returnResult;
    }

    /**
     * 自定义累加规则
     * @param v1
     * @param v2
     * @return
     */
    private String myAdd(String v1,String v2){
        if (StringUtils.isEmpty(v1)){
            return v2;
        }
        String[] valueArray = v2.split("\|");

        for (String string:valueArray){
            String regularExpression = "=";
            String[] fieldAndValueArray = string.split(regularExpression);
            String field = fieldAndValueArray[0];
            String value = fieldAndValueArray[1];
            String oldValue = StringUtils.getFieldFromConcatString(v1,"\|",field);

            if (oldValue != null){
                //将非String类型的数据单独取出
                if (Constants.FIELD_ABNORMAL_MONITOR_CAMERA_INFOS.equals(field)){
                    if (value.startsWith(" ~")){
                        value = value.substring(2);
                    }
                    v1 = StringUtils.setFieldConcatString(v1,"\|",field,oldValue+" ~"+value);
                }else {
                    //其余都是int类型 直接加减即可
                    int newValue = Integer.parseInt(oldValue)+Integer.parseInt(value);
                    v1 = StringUtils.setFieldConcatString(v1,"\|",field,String.valueOf(newValue));
                }
            }
        }
        return v1;
    }

}

  

原文地址:https://www.cnblogs.com/walxt/p/12872479.html