问题
以下多线程对int型变量x的操作,哪几个需要进行同步:( ABC )
A. x=y; B. x++; C. ++x; D. x=1;
引子
先看多线程同步的一个例子,如下面的代码,并发开3个线程,每个线程各自对同一个计数器自增100万次,预期结果应为300万。
【例子一】test.c
#include <stdlib.h> #include <stdio.h> #include <pthread.h> #include <stdatomic.h> #include <sys/time.h> void* count0(void* data) { int* p = (int*)data; for (int i = 0; i < 1e6; i++) { (*p)++; } return 0; } void test0(int concurrent) { int t = 0; pthread_t* pt = malloc(sizeof(pthread_t)* concurrent); for (int i = 0; i < concurrent; i++) pthread_create(pt + i, NULL, count0, &t); for (int i = 0; i < concurrent; i++) pthread_join(pt[i], NULL); printf("test0(simple) t=%d ", t); free(pt); } int main() { const int concurrent = 3; test0(concurrent); }
编译代码
gcc test.c -lpthread
结果一般都会小于300万,因为自增不是原子操作,而线程也并未做任何同步措施。
要解决这个通常可以采取原子类型或者直接加锁,代码如下:
【例子二】原子操作:
void* count1(void* data) { atomic_int* p = (atomic_int*)data; for (int i = 0; i < 1e6; i++) { //atomic_fetch_add(p, 1); atomic_fetch_add_explicit(p, 1, __ATOMIC_SEQ_CST ); } return 0; } void test1(int concurrent) { atomic_int t = ATOMIC_VAR_INIT(0); //printf("atomic_is_lock_free=%d ", atomic_is_lock_free(&t)); pthread_t* pt = malloc(sizeof(pthread_t)* concurrent); for (int i = 0; i < concurrent; i++) pthread_create(pt + i, NULL, count1, &t); for (int i = 0; i < concurrent; i++) pthread_join(pt[i], NULL); printf("test1(atomic) t=%d ", t); free(pt); }
【例子三】加锁:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; void* count2(void* data) { int* p = (int*)data; for (int i = 0; i < 1e6; i++) { pthread_mutex_lock(&mutex); (*p)++; pthread_mutex_unlock(&mutex); } return 0; } void test2(int concurrent) { int t = 0; pthread_t* pt = malloc(sizeof(pthread_t)* concurrent); for (int i = 0; i < concurrent; i++) pthread_create(pt + i, NULL, count2, &t); for (int i = 0; i < concurrent; i++) pthread_join(pt[i], NULL); printf("test2(lock) t=%d ", t); free(pt); }
上面两个代码都可以得到正确的预期结果,不过如何加锁和使用原子操作都不是我们这里要讨论的重点,我们希望了解这两种方式的底层原理。
回到例子一,如果将gcc编译选项加上-O1呢?结果就正确了,但我们需要知道原因。
提到自增操作是非原子的,我们写个简单例子看下
【例子四】test0.c
int main() { int a = 7; int b = 3; for (int n=0; n<10000; n++) a++; b = a; return b; }
查看其反汇编
gcc test0.c -c -S -O0
结果:
.section __TEXT,__text,regular,pure_instructions .build_version macos, 10, 15 sdk_version 10, 15, 6 .globl _main ## -- Begin function main .p2align 4, 0x90 _main: ## @main .cfi_startproc ## %bb.0: pushq %rbp .cfi_def_cfa_offset 16 .cfi_offset %rbp, -16 movq %rsp, %rbp .cfi_def_cfa_register %rbp movl $0, -4(%rbp) movl $7, -8(%rbp) movl $3, -12(%rbp) movl $0, -16(%rbp) LBB0_1: ## =>This Inner Loop Header: Depth=1 cmpl $10000, -16(%rbp) ## imm = 0x2710 jge LBB0_4 ## %bb.2: ## in Loop: Header=BB0_1 Depth=1 movl -8(%rbp), %eax addl $1, %eax movl %eax, -8(%rbp) ## %bb.3: ## in Loop: Header=BB0_1 Depth=1 movl -16(%rbp), %eax addl $1, %eax movl %eax, -16(%rbp) jmp LBB0_1 LBB0_4: movl -8(%rbp), %eax movl %eax, -12(%rbp) movl -12(%rbp), %eax popq %rbp retq .cfi_endproc ## -- End function .subsections_via_symbols
其中%rbp是栈指针,从## %bb.2: 段代码可见,自增操作分为3步:
- 内存变量值写到寄存器%eax;
- %eax 寄存器值加1;
- %eax 寄存器值回写到内存中;
同样,从## %bb.3: 段代码可见,赋值操作也分为类似3步。
所以例子一中,多线程并发对同一个计数器自增,会导致一些操作的丢失。
如果gcc加上优化选项:
gcc test0.c -c -S -O1
反汇编结果如下:
.section __TEXT,__text,regular,pure_instructions .build_version macos, 10, 15 sdk_version 10, 15, 6 .globl _main ## -- Begin function main .p2align 4, 0x90 _main: ## @main .cfi_startproc ## %bb.0: pushq %rbp .cfi_def_cfa_offset 16 .cfi_offset %rbp, -16 movq %rsp, %rbp .cfi_def_cfa_register %rbp movl $10007, %eax ## imm = 0x2717 popq %rbp retq .cfi_endproc ## -- End function .subsections_via_symbols
编译器生成的汇编代码大大简化了,甚至都丢掉了 b=3 的赋值语句(因为编译器认为丢掉它不影响后面b=a的结果)。
如果在变量b的声明处加上volatile关键字,
int main() { int a = 7; volatile int b = 3; for (int n=0; n<10000; n++) a++; b = a; return b; }
同样-O1选项下的反汇编结果:
.section __TEXT,__text,regular,pure_instructions .build_version macos, 10, 15 sdk_version 10, 15, 6 .globl _main ## -- Begin function main .p2align 4, 0x90 _main: ## @main .cfi_startproc ## %bb.0: pushq %rbp .cfi_def_cfa_offset 16 .cfi_offset %rbp, -16 movq %rsp, %rbp .cfi_def_cfa_register %rbp movl $3, -4(%rbp) movl $10007, -4(%rbp) ## imm = 0x2717 movl -4(%rbp), %eax popq %rbp retq .cfi_endproc ## -- End function .subsections_via_symbols
加上volatile之后,对变量b的读写操作都会直接作用于其内存地址,也不会再忽略 b=3 这条语句了。
volatile
我们知道volatile修饰的变量在每次被线程访问时,都强迫从内存中重读该变量的值。而且,当变量值发生变化时,强迫线程将变化值回写到内存。这样在任何时刻,两个不同的线程总是看到某个变量的同一个值。那么volatile 能代替原子操作吗,看个例子
typedef struct { volatile int count; } atomic_t; void* count3(void* data) { atomic_t* p = (atomic_t*)data; for (int i = 0; i < 1e6; i++) { (p->count)++; } return 0; } void test3(int concurrent) { atomic_t t = {0}; pthread_t* pt = malloc(sizeof(pthread_t)* concurrent); for (int i = 0; i < concurrent; i++) pthread_create(pt + i, NULL, count3, &t); for (int i = 0; i < concurrent; i++) pthread_join(pt[i], NULL); printf("test3(volatile) t=%d ", t.count); free(pt); }
结果不会是300万。也就是说 volatile不能保证所修饰的变量进行原子操作,它不能解决多线程的问题!
如果想让上面代码实现原子操作,可以使用如下方式:
void atomic_inc(atomic_t* v) { asm volatile( "lock incl %0" :"+m"(v->count) // output operands : // input operands : // list of clobbered registers ); } void* count4(void* data) { atomic_t* p = (atomic_t*)data; for (int i = 0; i < 1e6; i++) { atomic_inc(p); } return 0; } void test4(int concurrent) { atomic_t t = {0}; pthread_t* pt = malloc(sizeof(pthread_t)* concurrent); for (int i = 0; i < concurrent; i++) pthread_create(pt + i, NULL, count4, &t); for (int i = 0; i < concurrent; i++) pthread_join(pt[i], NULL); printf("test4(volatile_atomic) t=%d ", t.count); free(pt); }
该例子使用内联汇编实现了一个自增的原子操作,原理是修改变量前,先通过LOCK#命令锁住内存总线,从而阻止其它CPU的内存操作。
类似的,下面例子是实现一个自旋锁:
typedef struct { volatile int count; } atomic_t; static inline int atomic_xchg(atomic_t* v, int i) { int ret; asm volatile( "xchgl %0, %1" :"=r"(ret) :"m"(v->count),"0"(i) ); return ret; } void mutex_lock(atomic_t* v) { while (1 == atomic_xchg(v, 1)) { sched_yield(); } } void mutex_unlock(atomic_t* v) { v->count = 0; }
实际上,大部分锁机制也是通过compare-and-exchange这类命令实现的。
原子操作
以常见的X86 CPU来说,它基以下三种机制保证了多核中加锁的原子操作:
- 一些基本的内存读写操作是本身已经被硬件提供了原子性保证(例如读写单个字节的操作);
- 一些需要保证原子性但是没有被第(1)条机制提供支持的操作(例如read-modify-write)可以通过使用”LOCK#”来锁定总线,从而保证操作的原子性;
- 因为很多内存数据是已经存放在L1/L2 cache中了,对这些数据的原子操作只需要与本地的cache打交道,而不需要与总线打交道,所以CPU就提供了cache coherency机制来保证其它的那些也cache了这些数据的processor能读到最新的值。
对其中第一点而言,以下的基本内存操作是原子的:
- Reading or writing a byte(一个字节的读写)
- Reading or writing a word aligned on a 16-bit boundary(对齐到16位边界的字的读写)
- Reading or writing a doubleword aligned on a 32-bit boundary(对齐到32位边界的双字的读写)
- Reading or writing a quadword aligned on a 64-bit boundary(对齐到64位边界的四字的读写)
- 16-bit accesses to uncached memory locations that fit within a 32-bit data bus(未缓存且在32位数据总线范围之内的内存地址的访问)
- Unaligned 16-, 32-, and 64-bit accesses to cached memory that fit within a cache line(对单个cache line中缓存地址的未对齐的16/32/64位访问)
对于不以16位、32位对齐的读写,可以参考bit field的例子。bit field 常用来高效的存储有限位数的变量,多用于内核/底层开发中。
一般来说,对同一个结构体内的不同bit成员的多线程访问是无法保证线程安全的,看个例子:
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
typedef struct _foo {
int flag : 1;
int counter : 15;
} foo;
pthread_mutex_t my_mutex_for_flag = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t my_mutex_for_counter = PTHREAD_MUTEX_INITIALIZER;
/* in thread 1 */
void* fn1 (void* data) {
foo* my_foo = (foo*) data;
pthread_mutex_lock(&my_mutex_for_flag);
for (int n = 0; n < 1000; n++) my_foo->flag = !my_foo->flag;
pthread_mutex_unlock(&my_mutex_for_flag);
return 0;
}
/* in thread 2 */
void* fn2 (void* data) {
foo* my_foo = (foo*) data;
pthread_mutex_lock(&my_mutex_for_counter);
for (int n = 0; n < 1000; n++) ++my_foo->counter;
pthread_mutex_unlock(&my_mutex_for_counter);
return 0;
}
int main() {
pthread_t t1, t2;
foo f = {0, 0};
pthread_create(&t1, NULL, fn1, &f);
pthread_create(&t2, NULL, fn2, &f);
pthread_join(t1, 0);
pthread_join(t2, 0);
printf("flag=%d; counter=%d
", f.flag, f.counter);
}
反复执行时,结果如下:
flag=0; counter=1000
flag=-1; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=-1; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=1000
flag=0; counter=946
可见,如果 foo.flag 和 foo.counter 存储在同一个word里,CPU在读写任何一个bit member的时候会同时把两个值一起读进寄存器,从而造成读写冲突。这个例子正确的处理方式是用一个mutex同时保护 foo.flag 和 foo.counter,这样才能确保读写是线程安全的。
参考文档:
http://ericnode.info/post/atomic_in_c11/
http://www.parallellabs.com/2010/04/15/atomic-operation-in-multithreaded-application/
https://liam.page/2018/01/18/volatile-in-C-and-Cpp/