关于CAS

一.CAS是什么

CAS: 全称Compare and swap,字面意思:”比较并交换“。解决多线程并行情况下使用锁造成性能损耗的一种机制。

在计算机科学中,比较和交换(Conmpare And Swap)是用于实现多线程同步的原子指令。

它将内存位置的内容与给定值进行比较,只有在相同的情况下,将该内存位置的内容修改为新的给定值。

这是作为单个原子操作完成的。 原子性保证新值基于最新信息计算; 如果该值在同一时间被另一个线程更新,则写入将失败。 操作结果必须说明是否进行替换;

这可以通过一个简单的布尔响应(这个变体通常称为比较和设置),或通过返回从内存位置读取的值来完成。

synchronized是悲观锁,这种线程一旦得到锁,其他需要锁的线程就挂起的情况就是悲观锁。

CAS操作的就是乐观锁,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。

JAVA1.5开始引入了CAS,主要代码都放在JUC的atomic包下。

二.CAS的使用

1、自旋锁

public class SpinLock {

  private AtomicReference<Thread> sign =new AtomicReference<>();

  public void lock(){
    Thread current = Thread.currentThread();
    while(!sign .compareAndSet(null, current)){
    }
  }

  public void unlock (){
    Thread current = Thread.currentThread();
    sign .compareAndSet(current, null);
  }
}

所谓自旋锁,我觉得这个名字相当的形象,在lock()的时候,一直while()循环,直到 cas 操作成功为止。

2、AtomicInteger 的 incrementAndGet()

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

与自旋锁有异曲同工之妙,就是一直while,直到操作成功为止。

3、令牌桶限流器

所谓令牌桶限流器,就是系统以恒定的速度向桶内增加令牌。每次请求前从令牌桶里面获取令牌。如果获取到令牌就才可以进行访问。

当令牌桶内没有令牌的时候,拒绝提供服务。我们来看看 eureka 的限流器是如何使用 CAS 来维护多线程环境下对 token 的增加和分发的。

public class RateLimiter {

    private final long rateToMsConversion;

    private final AtomicInteger consumedTokens = new AtomicInteger();
    private final AtomicLong lastRefillTime = new AtomicLong(0);

    @Deprecated
    public RateLimiter() {
        this(TimeUnit.SECONDS);
    }

    public RateLimiter(TimeUnit averageRateUnit) {
        switch (averageRateUnit) {
            case SECONDS:
                rateToMsConversion = 1000;
                break;
            case MINUTES:
                rateToMsConversion = 60 * 1000;
                break;
            default:
                throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported");
        }
    }

    //提供给外界获取 token 的方法
    public boolean acquire(int burstSize, long averageRate) {
        return acquire(burstSize, averageRate, System.currentTimeMillis());
    }

    public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) {
        if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go
            return true;
        }

        //添加token
        refillToken(burstSize, averageRate, currentTimeMillis);

        //消费token
        return consumeToken(burstSize);
    }

    private void refillToken(int burstSize, long averageRate, long currentTimeMillis) {
        long refillTime = lastRefillTime.get();
        long timeDelta = currentTimeMillis - refillTime;

        //根据频率计算需要增加多少 token
        long newTokens = timeDelta * averageRate / rateToMsConversion;
        if (newTokens > 0) {
            long newRefillTime = refillTime == 0
                    ? currentTimeMillis
                    : refillTime + newTokens * rateToMsConversion / averageRate;

            // CAS 保证有且仅有一个线程进入填充
            if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) {
                while (true) {
                    int currentLevel = consumedTokens.get();
                    int adjustedLevel = Math.min(currentLevel, burstSize); // In case burstSize decreased
                    int newLevel = (int) Math.max(0, adjustedLevel - newTokens);
                    // while true 直到更新成功为止
                    if (consumedTokens.compareAndSet(currentLevel, newLevel)) {
                        return;
                    }
                }
            }
        }
    }

    private boolean consumeToken(int burstSize) {
        while (true) {
            int currentLevel = consumedTokens.get();
            if (currentLevel >= burstSize) {
                return false;
            }

            // while true 直到没有token 或者 获取到为止
            if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
                return true;
            }
        }
    }

    public void reset() {
        consumedTokens.set(0);
        lastRefillTime.set(0);
    }
}

所以梳理一下 CAS 在令牌桶限流器的作用。就是保证在多线程情况下,不阻塞线程的填充token 和消费token。

总结:

(1)CAS 的使用能够避免线程的阻塞。

(2)多数情况下我们使用的是 while true 直到成功为止。

CAS 是整个编程重要的思想之一。整个计算机的实现中都有CAS的身影。微观上看汇编的 CAS 是实现操作系统级别的原子操作的基石。

从编程语言角度来看 CAS 是实现多线程非阻塞操作的基石。宏观上看,在分布式系统中,可以使用 CAS 的思想利用类似Redis的外部存储,也能实现一个分布式锁。

从某个角度来说架构就将微观的实现放大,或者底层思想就是将宏观的架构进行微缩。

计算机的思想是想通的,所以说了解底层的实现可以提升架构能力,提升架构的能力同样可加深对底层实现的理解。

计算机知识浩如烟海,但是套路有限。抓住基础的几个套路突破,从思想和思维的角度学习计算机知识。

不要将自己的精力花费在不停的追求新技术的脚步上,跟随‘start guide line’只能写一个demo,所得也就是一个demo而已。

三.CAS原理

CAS通过调用JNI的代码实现的。JNI:Java Native Interface为JAVA本地调用,允许java调用其他语言。

而compareAndSwapInt就是借助C来调用CPU底层指令实现的。Unsafe类中的compareAndSwapInt,是一个本地方法,该方法的实现位于unsafe.cpp中。

实现思想 CAS(V, A, B),V为内存地址、A为预期原值,B为新值。如果内存地址的值与预期原值相匹配,那么将该位置值更新为新值。

否则,说明已经被其他线程更新,处理器不做任何操作;无论哪种情况,它都会在 CAS 指令之前返回该位置的值。

而我们可以使用自旋锁,循环CAS,重新读取该变量再尝试再次修改该变量,也可以放弃操作。

CAS操作由处理器提供支持,是一种原语。原语是操作系统或计算机网络用语范畴。是由若干条指令组成的,用于完成一定功能的一个过程,具有不可分割性,

即原语的执行必须是连续的,在执行过程中不允许被中断。如 Intel 处理器,比较并交换通过指令的 cmpxchg 系列实现。

Unsafe类,在sun.misc包下,不属于Java标准。Unsafe类提供一系列增加Java语言能力的操作,如内存管理、操作类/对象/变量、多线程同步等。

其中与CAS相关的方法有以下几个:

//var1为CAS操作的对象,offset为var1某个属性的地址偏移值,expected为期望值,var2为要设置的值,利用JNI来完成CPU指令的操作
public final native boolean compareAndSwapObject(Object var1, long offset, Object expected, Object var2);
public final native boolean compareAndSwapInt(Object var1, long offset, int expected, int var2);
public final native boolean compareAndSwapLong(Object var1, long offset, long expected, long var2);

(1)java 的 cas 利用的的是 unsafe 这个类提供的 cas 操作。

(2)unsafe 的cas 依赖了的是 jvm 针对不同的操作系统实现的 Atomic::cmpxchg。

(3)Atomic::cmpxchg 的实现使用了汇编的 cas 操作,并使用 cpu 硬件提供的 lock信号保证其原子性。

1、什么是CAS机制

CAS是英文单词Compare And Swap的缩写,翻译过来就是比较并替换。

CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

CAS机制当中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。

更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B。

例子:

(1)在内存地址V当中,存储着值为10的变量。

(2)此时线程1想要把变量的值增加1。对线程1来说,旧的预期值A=10,要修改的新值B=11。

(3)在线程1要提交更新之前,另一个线程2抢先一步,把内存地址V中的变量值率先更新成了11。

(4)线程1开始提交更新,首先进行A和地址V的实际值比较(Compare),发现A不等于V的实际值,提交失败。

(5)线程1重新获取内存地址V的当前值,并重新计算想要修改的新值。此时对线程1来说,A=11,B=12。这个重新尝试的过程被称为自旋。

(6)这一次比较幸运,没有其他线程改变地址V的值。线程1进行Compare,发现A和地址V的实际值是相等的。

(7)线程1进行SWAP,把地址V的值替换为B,也就是12。

从思想上来说,Synchronized属于悲观锁,悲观地认为程序中的并发情况严重,所以严防死守。

CAS属于乐观锁,乐观地认为程序中的并发情况不那么严重,所以让线程不断去尝试更新。

CAS的缺点:

(1)ABA问题。

因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,

那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。

ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。

从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。

这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

(2)循环时间长开销大。

自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。

如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),

使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。

第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。

(3)只能保证一个共享变量的原子操作。

当对一个共享变量执行操作时,可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,

这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。

从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。

2、concurrent包的实现

由于java的CAS同时具有 volatile 读和volatile写的内存语义,因此Java线程之间的通信现在有了下面四种方式:

(1)A线程写volatile变量,随后B线程读这个volatile变量。

(2)A线程写volatile变量,随后B线程用CAS更新这个volatile变量。

(3)A线程用CAS更新一个volatile变量,随后B线程用CAS更新这个volatile变量。

(4)A线程用CAS更新一个volatile变量,随后B线程读这个volatile变量。

Java的CAS会使用现代处理器上提供的高效机器级别原子指令,这些原子指令以原子方式对内存执行读-改-写操作,

这是在多处理器中实现同步的关键(从本质上来说,能够支持原子性读-改-写指令的计算机器,是顺序计算图灵机的异步等价机器,

因此任何现代的多处理器都会去支持某种能对内存执行原子性读-改-写操作的原子指令)。

同时,volatile变量的读/写和CAS可以实现线程之间的通信。把这些特性整合在一起,就形成了整个concurrent包得以实现的基石。

如果仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式:

(1)首先,声明共享变量为volatile;

(2)然后,使用CAS的原子条件更新来实现线程之间的同步;

(3)同时,配合以volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。

AQS,非阻塞数据结构和原子变量类(java.util.concurrent.atomic包中的类),这些concurrent包中的基础类都是使用这种模式来实现的,

而concurrent包中的高层类又是依赖于这些基础类来实现的。从整体来看,concurrent包的实现示意图如下:

3、相关的概念

关于java中锁的类型

(1)公平锁/非公平锁

(2)可重入锁

(3)独享锁/共享锁

(4)互斥锁/读写锁

(5)乐观锁/悲观锁

(6)分段锁

(7)偏向锁/轻量级锁/重量级锁

(8)自旋锁

悲观锁(Pessimistic Lock):

顾名思义,就是很悲观,假定会发生并发冲突,屏蔽一切可能违反数据完整性的操作,

每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。

传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

它指的是对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度,因此,在整个数据处理过程中,将数据处于锁定状态。

悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。


乐观锁(Optimistic Lock):

顾名思义,就是很乐观,假设不会发生并发冲突,只在提交操作时检查是否违反数据完整性。

乐观锁不能解决脏读的问题。每次拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。

乐观锁适用于多读的应用类型,这样可以提高吞吐量。

从上面的描述我们可以看出,悲观锁适合写操作非常多的场景,乐观锁适合读操作非常多的场景,不加锁会带来大量的性能提升

悲观锁在Java中的使用,就是利用各种锁。如synchronized。

乐观锁在Java中的使用,是无锁编程,常常采用的是CAS算法,典型的例子就是原子类,通过CAS自旋实现原子操作的更新。

在JDK 5之前Java语言是靠synchronized关键字保证同步的,这会导致有锁。

锁机制存在以下问题:

(1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。

(2)一个线程持有锁会导致其它所有需要此锁的线程挂起。

(3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。

volatile是不错的机制,但是volatile不能保证原子性。因此对于同步最终还是要回到锁机制上来。

独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。

所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。乐观锁用到的机制就是CAS,Compare and Swap。

原文地址:https://www.cnblogs.com/ZJOE80/p/12896528.html