一种高效的QPS统计方法

一、概述

对QPS的统计,通常是对一秒内各线程通过数据处理链中某一个切入点的次数进行累加计数。且不论采用何种方式都绕不开锁,那如何结合QPS统计的场景,减少线程之间对锁的竞争,是各实现方法考虑的重点问题。针对这个问题,Netflix的Hystrix限流组件中实现了一个十分高效的可进行QPS统计的工具类HystrixRollingNumber.java,该工具类也是集团限流工具Sentinel的核心。而该工具类的实现较为复杂,本文在该工具类的基础上做了一些实现上的改进,并在mac pro上用4个线程进行了测试,得到了如下对比图。从对比图可看出,优化后的方案可稳定提升效率,降低耗时,最高可达30%。

Hystrix与New Approach(改进方案)测试对比图:(纵轴:单位为毫秒,横轴:每个线程的总计数值)

              QPS对比

二、具体实现

1)Hystrix组件所用的方案

时间片、定长数组、Striped64的结合。既HystrixRollingNumber.java中的实现。

具体参考:https://github.com/Netflix/Hystrix

2)Hystrix所用方案的优缺点

对比每一个请求都加锁来统计计数,Hystrix通过尽量减少加锁次数,降低加锁粒度,从而降低线程之间对锁的竞争,在效率上有指数级的提升,实现上也完全面向对象,但对循环数组的维护稍微复杂了一些,使得线程定位Bucket也变得复杂,可能耗时稍长。

3)改进思路

主要两点:

一是在循环数组的维护与Bucket的定位上,实际可不需要Hystrix中那么多的判断条件,直接利用当前时间取余定位到循环数组中Bucket的位置,同时可通过比较当前时间点与接近上一次进入统计方法的时间点及每个Bucket对应时间片的关系即可知道是否应该进入下一个Bucket,最后计算QPS通过收集该定长数组中的数据即可。二是在一的基础上,尽量减少线程从进入统计方法到加数器LongAdder之间的耗时(此处即使多了一次赋值操作在性能上可能都会有较大影响)。

4)代码实现

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Author: gxj
 */
public class QPSCalculator {


    private RollingNumber rollingNumber;

    public QPSCalculator() {
        this.rollingNumber = new RollingNumber();
    }


    public void pass() {
        rollingNumber.record();
    }


    private static final class RollingNumber {
        /**
         * 槽位的数量
         */
        private int sizeOfBuckets;
        /**
         * 时间片,单位毫秒
         */
        private int unitOfTimeSlice;
        /**
         * 用于判断是否可跳过锁争抢
         */
        private int timeSliceUsedToCheckIfPossibleToBypass;
        /**
         * 槽位
         */
        private Bucket[] buckets;
        /**
         * 目标槽位的位置
         */
        private volatile Integer targetBucketPosition;
        /**
         * 接近目标槽位最新更新时间的时间
         */
        private volatile Long latestPassedTimeCloseToTargetBucket;
        /**
         * 进入下一个槽位时使用的锁
         */
        private ReentrantLock enterNextBucketLock;
        /**
         * 默认60个槽位,槽位的时间片为1000毫秒
         */
        public RollingNumber() {
            this(60, 1000);
        }
        /**
         * 初始化Bucket数量与每个Bucket的时间片等
         *
         * @param sizeOfBuckets
         * @param unitOfTimeSlice
         */
        public RollingNumber(int sizeOfBuckets, int unitOfTimeSlice) {
            this.latestPassedTimeCloseToTargetBucket = System.currentTimeMillis() - (2 * unitOfTimeSlice);
            this.targetBucketPosition = null;
            this.sizeOfBuckets = sizeOfBuckets;
            this.unitOfTimeSlice = unitOfTimeSlice;
            this.enterNextBucketLock = new ReentrantLock();
            this.buckets = new Bucket[sizeOfBuckets];
            this.timeSliceUsedToCheckIfPossibleToBypass = 3 * unitOfTimeSlice;
            for (int i = 0; i < sizeOfBuckets; i++) {
                this.buckets[i] = new Bucket();
            }
        }


        private void record() {
            long passTime = System.currentTimeMillis();
            if (targetBucketPosition == null) {
                targetBucketPosition = (int) (passTime / unitOfTimeSlice) % sizeOfBuckets;
            }
            Bucket currentBucket = buckets[targetBucketPosition];
            if (passTime - latestPassedTimeCloseToTargetBucket >= unitOfTimeSlice) {
                if (enterNextBucketLock.isLocked() && (passTime - latestPassedTimeCloseToTargetBucket) < timeSliceUsedToCheckIfPossibleToBypass) {
                } else {
                    try {
                        enterNextBucketLock.lock();
                        if (passTime - latestPassedTimeCloseToTargetBucket >= unitOfTimeSlice) {
                            int nextTargetBucketPosition = (int) (passTime / unitOfTimeSlice) % sizeOfBuckets;
                            Bucket nextBucket = buckets[nextTargetBucketPosition];
                            if (nextBucket.equals(currentBucket)) {
                                if (passTime - latestPassedTimeCloseToTargetBucket >= unitOfTimeSlice) {
                                    latestPassedTimeCloseToTargetBucket = passTime;
                                }
                            } else {
                                nextBucket.reset(passTime);
                                targetBucketPosition = nextTargetBucketPosition;
                                latestPassedTimeCloseToTargetBucket = passTime;
                            }
                            nextBucket.pass();
                            return;
                        } else {
                            currentBucket = buckets[targetBucketPosition];
                        }
                    } finally {
                        enterNextBucketLock.unlock();
                    }
                }
            }
            currentBucket.pass();
        }

        public Bucket[] getBuckets() {
            return buckets;
        }
    }


    private static class Bucket implements Serializable {

        private static final long serialVersionUID = -9085720164508215774L;

        private Long latestPassedTime;

        private LongAdder longAdder;

        public Bucket() {
            this.latestPassedTime = System.currentTimeMillis();
            this.longAdder = new LongAdder();
        }


        public void pass() {
            longAdder.add(1);
        }

        public long countTotalPassed() {
            return longAdder.sum();
        }

        public long getLatestPassedTime() {
            return latestPassedTime;
        }

        public void reset(long latestPassedTime) {
            this.longAdder.reset();
            this.latestPassedTime = latestPassedTime;
        }
    }



    public static void main(String[] args) {
        try {
            final QPSCalculator qpsCalculator = new QPSCalculator();
            int threadNum = 4;
            CountDownLatch countDownLatch = new CountDownLatch(threadNum);
            List<Thread> threadList = new ArrayList<Thread>();
            for (int i = 0; i < threadNum; i++) {
                threadList.add(new Thread() {
                    public void run() {
                        for (int i = 0; i < 50000000; i++) {
                            qpsCalculator.pass();
                        }
                        countDownLatch.countDown();
                    }
                });
            }

            long startTime = System.currentTimeMillis();
            for (Thread thread : threadList) {
                thread.start();
            }
            countDownLatch.await();
            long endTime = System.currentTimeMillis();
            long totalTime = endTime - startTime;
            System.out.print("totalMilliseconds:  " + totalTime);
            TimeUnit.SECONDS.sleep(1000L);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
原文地址:https://www.cnblogs.com/ganRegister/p/9369131.html