限流计数的基本数据结构

无锁环境数组

基本构成如下,窗口里面的Integer没有使用AtomicInteger, 而是把计数 根据当前线程的hashcode, 分为了若干段(根据重试次数自动裂变),减少冲突。是通过CPU, 内存来换时间的典型。它的写效率>AtomicInteger, 并发>int. 更详细的算法来自从下面的地方: Doug Lea, 可以在stripe64.java

//时间窗口
private final Integer             timeInMilliseconds;
//采样数量
private final Integer             numberOfBuckets;
//间隔步长
private final Integer             intervals;
//环形桶数组
private final BucketCircularArray buckets;

环形桶数组的基本结构

桶数组:AtomicReferenceArray

数组的游标:ListState,该对象每次新增一个桶,都会重新创建一个ListState(数组是可以复用的)。

 public ListState incrementTail() {

                if (size == numBuckets) {
                    return new ListState(data, (head + 1) % dataLength, (tail + 1) % dataLength);
                } else {
                    // increment only tail
                    return new ListState(data, head, (tail + 1) % dataLength);
                }

            }

AtomicReference :通过CAS操作设置

	ListState currentState = state.get();
        // create new version of state (what we want it to become)
        ListState newState = currentState.addBucket(o);

        /*
         * use compareAndSet to set in case multiple threads are attempting (which shouldn't be the case because
         * since addLast will ONLY be called by a single thread at a time due to protection provided in
         * <code>getCurrentBucket</code>)
         */
        if (state.compareAndSet(currentState, newState)) {
            // we succeeded
            return;
        } else {
            // we failed, someone else was adding or removing
            // instead of trying again and risking multiple addLast concurrently (which shouldn't be the case)
            // we'll just return and let the other thread 'win' and if the timing is off the next call to
            // getCurrentBucket will fix things
            return;
        }

AtomicReferenceArray:一个固化的桶数组

桶元素

Bucket:桶元素
windowStart 窗口时间
LongAdder[] 计数器
LongMaxUpdater[] 最大值

每个桶所占的单位时间,与最近桶的创建时间进行比较,看是否超过时间窗口

if (currentBucket != null && currentTime < currentBucket.windowStart + getBucketSizeInMilliseconds()) {
            // if we're within the bucket 'window of time' return the current one
            // NOTE: We do not worry if we are BEFORE the window in a weird case of where thread scheduling causes that
            // to occur,
            // we'll just use the latest as long as we're not AFTER the window
            return currentBucket;
        }

桶内计数

多线程计数器:LongAdder

与AtomicLong的比较:
LongAdder是JDK8中并发包中的一个新类,和AtomicLong都使用CAS,但是性能比AtomicLong更好。

  LongAdder在AtomicLong的基础上进行了热点分离,热点分离类似于有锁操作中的减小锁粒度,将一个锁分离成若干个锁来提高性能。在无锁中,也可以用类似的方式来增加CAS的成功率,从而提高性能。

  AtomicLong通过CAS 指令从机器指令级别操作保证并发的原子性,但在高并发时CAS的失败几率更高, 重试次数更多,越多线程重试,CAS失败几率又越高,变成恶性循环,AtomicLong效率降低。

  而LongAdder有点类似ConcurrentHashMap,将把一个value拆分成若干cell,把所有cell加起来,就是value。

  对LongAdder进行加减操作,只需要对不同的cell来操作,不同的线程对不同的cell进行CAS操作,CAS的成功率当然高了(试想一下3+2+1=6,一个线程3+1,另一个线程2+1,最后3个数字的结果是8,LongAdder没有乘法除法的API)。

  可是在并发数不是很高的情况,拆分成若干的cell,还需要维护cell和求和,效率不如AtomicLong的实现。LongAdder用了巧妙的办法来解决了这个问题。

  初始情况,LongAdder与AtomicLong是相同的,只有在CAS失败时,才会将value拆分成cell,每失败一次,都会增加cell的数量,这样在低并发时,同样高效,在高并发时,这种“自适应”的处理方式,达到一定cell数量后,CAS将不会失败,效率大大提高。

  但JDK8并没有所谓的IntegerAdder,只增加了LongAdder和DoubleAdder。

原文地址:https://www.cnblogs.com/zhulongchao/p/5573272.html