原子操作与内存屏障之二——原子操作

问题

以下多线程对int型变量x的操作,哪几个需要进行同步:( ABC )
A. x=y; B. x++; C. ++x; D. x=1;  

引子

先看多线程同步的一个例子,如下面的代码,并发开3个线程,每个线程各自对同一个计数器自增100万次,预期结果应为300万。

【例子一】test.c

#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <stdatomic.h>
#include <sys/time.h>

void* count0(void* data) {
    int* p = (int*)data;
    for (int i = 0; i < 1e6; i++) {
        (*p)++;
    }
    return 0;
}

void test0(int concurrent) {
    int t = 0;

    pthread_t* pt = malloc(sizeof(pthread_t)* concurrent);

    for (int i = 0; i < concurrent; i++)
        pthread_create(pt + i, NULL, count0, &t);

    for (int i = 0; i < concurrent; i++)
        pthread_join(pt[i], NULL);

    printf("test0(simple) t=%d
", t);
    free(pt);
}

int main() {
    const int concurrent = 3;
    test0(concurrent);
}

编译代码

gcc test.c -lpthread

结果一般都会小于300万,因为自增不是原子操作,而线程也并未做任何同步措施。

要解决这个通常可以采取原子类型或者直接加锁,代码如下:

【例子二】原子操作

void* count1(void* data) {
    atomic_int* p = (atomic_int*)data;
    for (int i = 0; i < 1e6; i++) {
        //atomic_fetch_add(p, 1);
        atomic_fetch_add_explicit(p, 1, __ATOMIC_SEQ_CST );
    }   
    return 0;
}

void test1(int concurrent) {
    atomic_int t = ATOMIC_VAR_INIT(0);
    //printf("atomic_is_lock_free=%d
", atomic_is_lock_free(&t));

    pthread_t* pt = malloc(sizeof(pthread_t)* concurrent);

    for (int i = 0; i < concurrent; i++)
        pthread_create(pt + i, NULL, count1, &t);

    for (int i = 0; i < concurrent; i++)
        pthread_join(pt[i], NULL);

    printf("test1(atomic) t=%d
", t); 
    free(pt);
}

【例子三】加锁:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void* count2(void* data) {
    int* p = (int*)data;
    for (int i = 0; i < 1e6; i++) {
        pthread_mutex_lock(&mutex);
        (*p)++;
        pthread_mutex_unlock(&mutex);
    }   
    return 0;
}

void test2(int concurrent) {
    int t = 0;

    pthread_t* pt = malloc(sizeof(pthread_t)* concurrent);

    for (int i = 0; i < concurrent; i++)
        pthread_create(pt + i, NULL, count2, &t);

    for (int i = 0; i < concurrent; i++)
        pthread_join(pt[i], NULL);

    printf("test2(lock) t=%d
", t);
    free(pt);
}

上面两个代码都可以得到正确的预期结果,不过如何加锁和使用原子操作都不是我们这里要讨论的重点,我们希望了解这两种方式的底层原理。

回到例子一,如果将gcc编译选项加上-O1呢?结果就正确了,但我们需要知道原因。

提到自增操作是非原子的,我们写个简单例子看下

【例子四】test0.c

int main() {
    int a = 7;
    int b = 3;
    for (int n=0; n<10000; n++)
        a++;
    b = a;
    return b;
}

 查看其反汇编

gcc test0.c -c -S -O0

结果:

    .section    __TEXT,__text,regular,pure_instructions
    .build_version macos, 10, 15    sdk_version 10, 15, 6
    .globl  _main                   ## -- Begin function main
    .p2align    4, 0x90
_main:                                  ## @main
    .cfi_startproc
## %bb.0:
    pushq   %rbp
    .cfi_def_cfa_offset 16
    .cfi_offset %rbp, -16 
    movq    %rsp, %rbp
    .cfi_def_cfa_register %rbp
    movl    $0, -4(%rbp)
    movl    $7, -8(%rbp)
    movl    $3, -12(%rbp)
    movl    $0, -16(%rbp)
LBB0_1:                                 ## =>This Inner Loop Header: Depth=1
    cmpl    $10000, -16(%rbp)       ## imm = 0x2710
    jge LBB0_4
## %bb.2:                               ##   in Loop: Header=BB0_1 Depth=1
    movl    -8(%rbp), %eax
    addl    $1, %eax
    movl    %eax, -8(%rbp)
## %bb.3:                               ##   in Loop: Header=BB0_1 Depth=1
    movl    -16(%rbp), %eax
    addl    $1, %eax
    movl    %eax, -16(%rbp)
    jmp LBB0_1
LBB0_4:
    movl    -8(%rbp), %eax
    movl    %eax, -12(%rbp)
    movl    -12(%rbp), %eax
    popq    %rbp
    retq
    .cfi_endproc
                                        ## -- End function

.subsections_via_symbols

其中%rbp是栈指针,从## %bb.2: 段代码可见,自增操作分为3步:

  1. 内存变量值写到寄存器%eax;
  2. %eax 寄存器值加1;
  3. %eax 寄存器值回写到内存中;

同样,从## %bb.3: 段代码可见,赋值操作也分为类似3步。

所以例子一中,多线程并发对同一个计数器自增,会导致一些操作的丢失。

如果gcc加上优化选项:

gcc test0.c -c -S -O1

反汇编结果如下:

    .section    __TEXT,__text,regular,pure_instructions
    .build_version macos, 10, 15    sdk_version 10, 15, 6
    .globl  _main                   ## -- Begin function main
    .p2align    4, 0x90
_main:                                  ## @main
    .cfi_startproc
## %bb.0:
    pushq   %rbp
    .cfi_def_cfa_offset 16
    .cfi_offset %rbp, -16 
    movq    %rsp, %rbp
    .cfi_def_cfa_register %rbp
    movl    $10007, %eax            ## imm = 0x2717
    popq    %rbp
    retq
    .cfi_endproc
                                        ## -- End function

.subsections_via_symbols

编译器生成的汇编代码大大简化了,甚至都丢掉了 b=3 的赋值语句(因为编译器认为丢掉它不影响后面b=a的结果)。

如果在变量b的声明处加上volatile关键字,

int main() {
    int a = 7;
    volatile int b = 3;
    for (int n=0; n<10000; n++)
        a++;
    b = a;
    return b;
}

同样-O1选项下的反汇编结果:

    .section    __TEXT,__text,regular,pure_instructions
    .build_version macos, 10, 15    sdk_version 10, 15, 6
    .globl  _main                   ## -- Begin function main
    .p2align    4, 0x90
_main:                                  ## @main
    .cfi_startproc
## %bb.0:
    pushq   %rbp
    .cfi_def_cfa_offset 16
    .cfi_offset %rbp, -16 
    movq    %rsp, %rbp
    .cfi_def_cfa_register %rbp
    movl    $3, -4(%rbp)
    movl    $10007, -4(%rbp)        ## imm = 0x2717
    movl    -4(%rbp), %eax
    popq    %rbp
    retq
    .cfi_endproc
                                        ## -- End function

.subsections_via_symbols

加上volatile之后,对变量b的读写操作都会直接作用于其内存地址,也不会再忽略 b=3 这条语句了。

volatile

我们知道volatile修饰的变量在每次被线程访问时,都强迫从内存中重读该变量的值。而且,当变量值发生变化时,强迫线程将变化值回写到内存。这样在任何时刻,两个不同的线程总是看到某个变量的同一个值。那么volatile 能代替原子操作吗,看个例子

typedef struct {
    volatile int count;
} atomic_t;

void* count3(void* data) {
    atomic_t* p = (atomic_t*)data;
    for (int i = 0; i < 1e6; i++) {
        (p->count)++;
    }
    return 0;
}

void test3(int concurrent) {
    atomic_t t = {0};

    pthread_t* pt = malloc(sizeof(pthread_t)* concurrent);

    for (int i = 0; i < concurrent; i++)
        pthread_create(pt + i, NULL, count3, &t);

    for (int i = 0; i < concurrent; i++)
        pthread_join(pt[i], NULL);

    printf("test3(volatile) t=%d
", t.count);
    free(pt);
}

结果不会是300万。也就是说 volatile不能保证所修饰的变量进行原子操作,它不能解决多线程的问题!

如果想让上面代码实现原子操作,可以使用如下方式:

void atomic_inc(atomic_t* v) {
    asm volatile(
            "lock incl %0"
            :"+m"(v->count) // output operands
            :               // input operands
            :               // list of clobbered registers 
            );  
}

void* count4(void* data) {
    atomic_t* p = (atomic_t*)data;
    for (int i = 0; i < 1e6; i++) {
        atomic_inc(p);
    }   
    return 0;
}

void test4(int concurrent) {
    atomic_t t = {0};

    pthread_t* pt = malloc(sizeof(pthread_t)* concurrent);

    for (int i = 0; i < concurrent; i++)
        pthread_create(pt + i, NULL, count4, &t);

    for (int i = 0; i < concurrent; i++)
        pthread_join(pt[i], NULL);

    printf("test4(volatile_atomic) t=%d
", t.count);
    free(pt);
}

该例子使用内联汇编实现了一个自增的原子操作,原理是修改变量前,先通过LOCK#命令锁住内存总线,从而阻止其它CPU的内存操作。

类似的,下面例子是实现一个自旋锁:

typedef struct {
    volatile int count;
} atomic_t;

static inline int atomic_xchg(atomic_t* v, int i) {
    int ret;
    asm volatile(
            "xchgl %0, %1"
            :"=r"(ret)
            :"m"(v->count),"0"(i)
            );

    return ret;
}

void mutex_lock(atomic_t* v) {
    while (1 == atomic_xchg(v, 1)) {
        sched_yield();
    }
}

void mutex_unlock(atomic_t* v) {
    v->count = 0;
}

 实际上,大部分锁机制也是通过compare-and-exchange这类命令实现的。

原子操作

以常见的X86 CPU来说,它基以下三种机制保证了多核中加锁的原子操作:

  1. 一些基本的内存读写操作是本身已经被硬件提供了原子性保证(例如读写单个字节的操作);
  2. 一些需要保证原子性但是没有被第(1)条机制提供支持的操作(例如read-modify-write)可以通过使用”LOCK#”来锁定总线,从而保证操作的原子性;
  3. 因为很多内存数据是已经存放在L1/L2 cache中了,对这些数据的原子操作只需要与本地的cache打交道,而不需要与总线打交道,所以CPU就提供了cache coherency机制来保证其它的那些也cache了这些数据的processor能读到最新的值

对其中第一点而言,以下的基本内存操作是原子的:

  • Reading or writing a byte(一个字节的读写)
  • Reading or writing a word aligned on a 16-bit boundary(对齐到16位边界的字的读写)
  • Reading or writing a doubleword aligned on a 32-bit boundary(对齐到32位边界的双字的读写)
  • Reading or writing a quadword aligned on a 64-bit boundary(对齐到64位边界的四字的读写)
  • 16-bit accesses to uncached memory locations that fit within a 32-bit data bus(未缓存且在32位数据总线范围之内的内存地址的访问)
  • Unaligned 16-, 32-, and 64-bit accesses to cached memory that fit within a cache line(对单个cache line中缓存地址的未对齐的16/32/64位访问)

对于不以16位、32位对齐的读写,可以参考bit field的例子。bit field 常用来高效的存储有限位数的变量,多用于内核/底层开发中。

一般来说,对同一个结构体内的不同bit成员的多线程访问是无法保证线程安全的,看个例子:

#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>

typedef struct _foo {
        int flag : 1;
        int counter : 15;
} foo;

pthread_mutex_t my_mutex_for_flag = PTHREAD_MUTEX_INITIALIZER; 
pthread_mutex_t my_mutex_for_counter = PTHREAD_MUTEX_INITIALIZER; 
 
/* in thread 1 */
void* fn1 (void* data) {
        foo* my_foo = (foo*) data;
        pthread_mutex_lock(&my_mutex_for_flag);
        for (int n = 0; n < 1000; n++) my_foo->flag = !my_foo->flag;
        pthread_mutex_unlock(&my_mutex_for_flag);
        return 0;
}
 
/* in thread 2 */
void* fn2 (void* data) {
        foo* my_foo = (foo*) data;
        pthread_mutex_lock(&my_mutex_for_counter);
        for (int n = 0; n < 1000; n++) ++my_foo->counter;
        pthread_mutex_unlock(&my_mutex_for_counter);
        return 0;
}

int main() {
        pthread_t t1, t2;
        foo f = {0, 0};
        pthread_create(&t1, NULL, fn1, &f);
        pthread_create(&t2, NULL, fn2, &f);

        pthread_join(t1, 0);
        pthread_join(t2, 0);

        printf("flag=%d; counter=%d
", f.flag, f.counter);
}

反复执行时,结果如下:

flag=0; counter=1000
flag=-1; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=-1; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=946

可见,如果 foo.flag 和 foo.counter 存储在同一个word里,CPU在读写任何一个bit member的时候会同时把两个值一起读进寄存器,从而造成读写冲突。这个例子正确的处理方式是用一个mutex同时保护 foo.flag 和 foo.counter,这样才能确保读写是线程安全的。

参考文档:

http://ericnode.info/post/atomic_in_c11/

http://www.parallellabs.com/2010/04/15/atomic-operation-in-multithreaded-application/

https://liam.page/2018/01/18/volatile-in-C-and-Cpp/

https://zhuanlan.zhihu.com/p/115355303

https://zhuanlan.zhihu.com/p/102406978

原文地址:https://www.cnblogs.com/chenny7/p/13572322.html