探究加法操作的原子性

加法在多线程下是否可靠
我们先看下面的实例:

#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <stdio.h>
#include <memory>
using namespace std; 

int g_count = 0; 

int main(int argc, const char *argv[])
{
    vector<unique_ptr<thread>> ths;
    for (int i = 0; i < 100; i++) {
        ths.push_back(unique_ptr<thread>(new thread([]()
                {
                    for (int i = 0; i < 1000000; i++) {
                        ++g_count;
                    }
                })));
    } 

    for(auto &f : ths)
    {
        f->join();
    } 

    printf("count = %d
", g_count);
    return 0;
}

这段代码使用了C++11的thread模块以及lambda匿名函数特性。每个线程都对全局变量count进行加一,一共100个线程,每个线程执行1000000,那么最后的结果应该是100 * 1000000 = 100000000,一亿。 我们查看下实际的结果:

➜  test  g++ 1.cc -std=c++11 -lpthread
➜  test  ./a.out
count = 64526893
➜  test  ./a.out
count = 88669187
➜  test  ./a.out
count = 60832281
➜  test  ./a.out
count = 82043096
➜  test  ./a.out
count = 65450522
➜  test


结果和我们预期的不太一样,结果低于 10000 0000.那么这是为什么呢?

加法的原理

mov eax, DWORD PTR [test]  
inc eax  
mov DWORD PTR [text], eax


这是一段VS下的汇编(本人不会写汇编,这段代码由他人提供,虽然不是Linux平台,但是原理类似),从这段代码,我们可以看出。加一操作是分为三个步骤:

    1.从内存中取数据到寄存器 
    2.对寄存器执行+1 
    3.将结果存回内存

但是,上面的这三个指令不具有原子性,也就是说,这三个操作有可能被打断,而不是一气呵成的。 如果两个线程同时执行+1,那么这两个线程可能交替执行,过程如下:

1.线程A从内存中取到数据,此时为33
2.线程B取到也为33.
3.A对数据执行加法,为34
4.A将结果返回内存,保存在34
5.B对数据执行加法,为34,
6.B将结果返回内存,为34.

一个值为33的变量,相加2次,结果居然是34.这个错误的结果正是加法操作的费原子性导致的。

那么解决方案在哪里?

加锁保护加法操作
这听起来是可行的,我们对之前的代码稍作改动,如下:

#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <stdio.h>
#include <mutex>
#include <memory>
using namespace std; 

int g_count = 0;
std::mutex g_lock; 

int main(int argc, const char *argv[])
{
    vector<unique_ptr<thread>> ths;
    for (int i = 0; i < 100; i++) {
        ths.push_back(unique_ptr<thread>(new thread([]()
                {
                    for (int i = 0; i < 1000000; i++) {
                        g_lock.lock();
                        ++g_count;
                        g_lock.unlock();
                    }
                })));
    } 

    for(auto &f : ths)
    {
        f->join();
    } 

    printf("count = %d
", g_count);
    return 0;
}

然后查看运行结果:

➜  test  ./a.out
count = 100000000
➜  test  ./a.out
count = 100000000
➜  test  ./a.out
count = 100000000
➜  test  ./a.out
count = 100000000
➜  test
我们看到使用加锁,保证了结果的正确性,因此加锁是种可行的解决方案。

使用CAS原子操作
我们注意到一个细节,使用了mutex后,结果虽然正确了,但是程序运行的时间也大大增加了。 因为锁就是一把独木桥,程序中100个线程争先恐后去走独木桥,结果可想而知。

那么好的解决方案是什么?我们采用CAS。
CAS是一组原子操作,他可以防止运行这些指令时,CPU不被打断,从而保证操作的原子性。
有关CAS,可以参考http://en.wikipedia.org/wiki/Compare-and-swap

gcc提供了一组原子操作,参考http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html

这里我们使用_syncfetchandadd,代码如下:

#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <stdio.h>
#include <memory>
using namespace std; 

int g_count = 0; 

int main(int argc, const char *argv[])
{
    vector<unique_ptr<thread>> ths;
    for (int i = 0; i < 100; i++) {
        ths.push_back(unique_ptr<thread>(new thread([]()
                {
                    for (int i = 0; i < 1000000; i++) {
                        //++g_count;
                        __sync_fetch_and_add(&g_count, 1);
                    }
                })));
    } 

    for(auto &f : ths)
    {
        f->join();
    } 

    printf("count = %d
", g_count);
    return 0;
}

查看运行结果,依然是正确的。

比较效率
我们比较两种解决方案的效率问题,为了更好的对比,我们把第一个错误的代码也计算进去:

运行错误的代码,测评如下:

➜  test  time ./a.out
count = 75377651
./a.out  0.24s user 0.00s system 99% cpu 0.246 total
➜  test  time ./a.out
count = 63644431
./a.out  0.25s user 0.00s system 99% cpu 0.254 total
➜  test  time ./a.out
count = 23350388
./a.out  0.25s user 0.00s system 99% cpu 0.257 total
➜  test
加锁的代码如下:

➜  test  g++ 1.cc -std=c++11 -lpthread
➜  test  time ./a.out
count = 100000000
./a.out  2.71s user 0.01s system 99% cpu 2.723 total
➜  test  time ./a.out
count = 100000000
./a.out  2.66s user 0.01s system 99% cpu 2.676 total
➜  test  time ./a.out
count = 100000000
./a.out  2.69s user 0.01s system 99% cpu 2.699 total
➜  test
使用原子操作的代码:

➜  test  g++ 3.cc -std=c++11 -lpthread
➜  test  time ./a.out
count = 100000000
./a.out  0.67s user 0.00s system 99% cpu 0.676 total
➜  test  time ./a.out
count = 100000000
./a.out  0.65s user 0.00s system 99% cpu 0.655 total
➜  test  time ./a.out
count = 100000000
./a.out  0.65s user 0.01s system 99% cpu 0.661 total
➜  test
可以看到,使用原子操作,即保证了效率,又保证了安全性。
加锁之所以效率低,原因在于它的粒度太粗,当一个线程加锁失败,让出CPU,此时CPU的大部分调度都是无用功,因为其它线程无法继续向下执行。
而原子操作,直接避免了让出CPU,因为它通常都比较短,远远低于一个线程分配的时间片,所以对系统没有负面影响。

C++11提供的原子操作
C++11也提供了原子操作,我们使用它的代码如下:

#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <stdio.h>
#include <memory>
#include <atomic>
using namespace std; 

atomic_int g_count; 

int main(int argc, const char *argv[])
{
    vector<unique_ptr<thread>> ths;
    for (int i = 0; i < 100; i++) {
        ths.push_back(unique_ptr<thread>(new thread([]()
                {
                    for (int i = 0; i < 1000000; i++) {
                        //++g_count;
                        ++g_count;
                    }
                })));
    } 

    for(auto &f : ths)
    {
        f->join();
    } 

    //printf("count = %d
", g_count);
    printf("count = %d
", g_count.load());
    return 0;
}

结果:

➜  test  g++ 4.cc -std=c++11 -lpthread
➜  test  ./a.out
count = 100000000
➜  test  ./a.out
count = 100000000
➜  test  time ./a.out
count = 100000000
./a.out  0.88s user 0.01s system 99% cpu 0.886 total
➜  test  time ./a.out
count = 100000000
./a.out  0.88s user 0.01s system 99% cpu 0.891 total
➜  test  time ./a.out
count = 100000000
./a.out  0.88s user 0.01s system 99% cpu 0.893 total

原文地址:https://www.cnblogs.com/inevermore/p/4282536.html