一致性数据服务

继承ActualDataModelOperationHandler类并使用updateData方法更新数据,使用readData方法获取数据。

package com.yy.fastcustom.actualupdate;

import java.util.concurrent.BlockingQueue;

/**
 * Created by zzq on 2019/5/16.
 */
public abstract class ActualDataModelOperationHandler<Data> {
    private String readWriteFlag;

    public final String getReadWriteFlag() {
        return readWriteFlag;
    }

    public final void setReadWriteFlag(String readWriteFlag) {
        this.readWriteFlag = readWriteFlag;
    }

    private long readWaitMillis;

    public final long getReadWaitMillis() {
        return readWaitMillis;
    }

    public final void setReadWaitMillis(long readWaitMillis) {
        this.readWaitMillis = readWaitMillis;
    }

    /**
     * 更新缓存信息
     *
     * @return 成功返回true
     */
    public abstract boolean removeCache();

    /**
     * 更新数据信息
     *
     * @return 成功返回true
     */
    public abstract boolean updateOriginalDataAndCache();

    /**
     * 综合读取数据(先从缓存读取,读不到在读数据库)
     *
     * @return
     */
    public abstract Data getOriginalData();

    public abstract Data getCacheData();

    public abstract void setCacheData(Data data);

    public final void updateData() throws Exception {
        BlockingQueue<ActualDataModelOperationHandler> actualDataModels = ActualDataServiceManager.getUpdateQueue(this.getReadWriteFlag());
        actualDataModels.put(this);
    }

    public final Data readData() {
        String flag = this.getReadWriteFlag();
        if (ActualDataServiceManager.contains(flag)) {
            long startTime = System.currentTimeMillis();//当前毫秒数
            for (; true; ) {
                if (ActualDataServiceManager.noContains(flag) || (System.currentTimeMillis() - startTime > this.getReadWaitMillis())) //如果自旋时间大于设定阈值
                    break;
                Data retData = this.getCacheData();
                if (retData != null)
                    return retData;
            }
        }
        Data retData = this.getCacheData();
        if (retData != null)
            return retData;
        retData = this.getOriginalData();
        if (retData != null)
            this.setCacheData(retData);
        return retData;
    }
}
package com.yy.fastcustom.actualupdate;

import java.util.*;
import java.util.concurrent.*;

/**
 * Created by zzq on 2019/5/16.
 */
public class ActualDataServiceManager {
    private static volatile List<DataUpdateQueueWrapper> queueList = null;
    private static Map<String, Object> updateOperationTag = null;
    private static ExecutorService executor = null;

    public static void initQueueNum(int size) {
        if (queueList == null)
            synchronized (ActualDataServiceManager.class) {
                if (queueList == null)
                    initBasicObject(size);
            }
    }

    private static void initBasicObject(int size) {
        int sizeExtend = size * 1000;//hash范围放大1000倍
        queueList = new ArrayList<>(sizeExtend);
        updateOperationTag = new ConcurrentHashMap<>();
        executor = new ThreadPoolExecutor(
                size,
                size,
                60L,
                TimeUnit.SECONDS,
                new SynchronousQueue(),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        DataUpdateQueueWrapper ary[] = new DataUpdateQueueWrapper[size];
        for (int i = 0; i < size; i++) {
            DataUpdateQueueWrapper dataUpdateQueueWrapper = new DataUpdateQueueWrapper();
            ary[i] = dataUpdateQueueWrapper;
            executor.submit(dataUpdateQueueWrapper);
        }
        for (int i = 0; i < sizeExtend; i++) {//生成虚拟hash节点
            queueList.add(ary[i % size]);
        }
    }

    /**
     * 根据标记获取队列
     *
     * @param flag
     * @return
     */
    public static BlockingQueue<ActualDataModelOperationHandler> getUpdateQueue(String flag) throws Exception {
        if (queueList == null) {
            throw new Exception("在调用该方法前,调用initQueueNum初始化更新队列个数");
        }
        int hash;
        int index = flag == null ? 0 : (hash = flag.hashCode()) ^ (hash >>> 16);
        return queueList.get((queueList.size() - 1) & index).getQueue();
    }

    public static void addFlag(String flag) {
        updateOperationTag.put(flag, "flagValue");
    }

    public static void removeFlag(String flag) {
        updateOperationTag.remove(flag);
    }

    public static boolean contains(String flag) {
        return updateOperationTag.containsKey(flag);
    }

    public static boolean noContains(String flag) {
        return !contains(flag);
    }
}

更新操作容器

package com.yy.fastcustom.actualupdate;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Created by zzq on 2019/5/16.
 */
public class DataUpdateQueueWrapper implements Callable {
    private volatile BlockingQueue<ActualDataModelOperationHandler> updateQueue = null;

    {
        updateQueue = new LinkedBlockingQueue<>();
    }

    public BlockingQueue<ActualDataModelOperationHandler> getQueue() {
        return updateQueue;
    }

    @Override
    public Object call() throws Exception {
        for (; ; ) {
            ActualDataModelOperationHandler actualDataModel = updateQueue.take();
            String flag = actualDataModel.getReadWriteFlag();
            ActualDataServiceManager.addFlag(flag);//这里由单独的工作线程loop队列
            try {
                boolean b1 = actualDataModel.removeCache();
                if (b1)//如果缓存删除失败则,不执行更新数据库并刷新到缓存;缓存删除成功即使updateDbAndCache执行失败也能保证旧数据的一致性
                    actualDataModel.updateOriginalDataAndCache();
            } finally {
                ActualDataServiceManager.removeFlag(flag);
            }
        }
    }
}

测试例子

package com.yy.fastcustom.actualupdate;


/**
 * Created by zzq on 2019/5/27/027.
 */
public class TestDataModel extends ActualDataModelOperationHandler<String> {
    public static void main(String[] args) {
//        ceshi_hash();


        String key = "1";
        ActualDataServiceManager.initQueueNum(3);

        final TestDataModel actualDataModel = new TestDataModel();
        actualDataModel.setReadWriteFlag(key);
        actualDataModel.setReadWaitMillis(20 * 1000);

        try {
            actualDataModel.updateData();
        } catch (Exception e) {
            e.printStackTrace();
        }

        new Thread(() -> {
            String ret = null;
            ret = actualDataModel.readData();

            System.out.println("=====end out====" + ret);
        }).start();
    }

    private static void ceshi_hash() {
        int p1 = 356349;
        int p2 = 453546;
        int p3 = 678345;
        int p4 = 768678;
        int p5 = 967897;

        int r1 = p1 & 2;
        int r2 = p2 & 2;
        int r3 = p3 & 2;
        int r4 = p4 & 2;
        int r5 = p5 & 2;

        int r1_ = p1 & 2999;
        int r2_ = p2 & 2999;
        int r3_ = p3 & 2999;
        int r4_ = p4 & 2999;
        int r5_ = p5 & 2999;

        System.out.println("====");
    }

    volatile String uu = null;

    @Override
    public boolean removeCache() {
        for (int i = 0; i < 5; i++) {
            System.out.println("====removeCache==缓存==");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return true;
    }

    @Override
    public boolean updateOriginalDataAndCache() {
        for (int i = 0; i < 5; i++) {
            System.out.println("====updateOriginalDataAndCache==数据库==");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (i == 1) {
                uu = "获取到了缓存数据";
            }
        }
        return false;
    }

    @Override
    public void setCacheData(String s) {
        uu = s;
        System.out.println("====读取数据库成功,将其放入缓存==");
    }

    @Override
    public String getOriginalData() {
        return "没有找到缓存,从数据库加载!";
    }

    @Override
    public String getCacheData() {
        System.out.println("====读取缓存但没读取到==");
        return null;
    }
}
原文地址:https://www.cnblogs.com/zzq-include/p/11055252.html