无锁队列https://coolshell.cn/articles/8239.html
链表实现 cas
入队时注意 lock free(锁无关)问题 防止死锁
Tail
出队
如果是用了指针小心 aba问题(指针内存重用)
解决
1、使用double-CAS(双保险的CAS)
1)一次用CAS检查双倍长度的值,前半部是指针,后半部分是一个计数器。
2)只有这两个都一样,才算通过检查,要吧赋新的值。并把计数器累加1。
2、不让那个地址重用
使用结点内存引用计数,就是那块内存的引用加1阻止被回收,搞完后再减一,这两个操作都是原子操作
数组实现 环形数组(求模) cas 原子加减
声明两个计数器,一个用来计数入队的次数,一个用来计数出队的次数。
入队
定位TAIL的位置,double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。如果找不到(TAIL, EMPTY),则说明队列满了。再原子入队加
出队
定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。同样需要注意,如果x是TAIL,则说明队列为空。再原子出队加
Linux内核的kfifo[编辑] 这个就有用到内存屏蔽
在Linux内核文件kfifo.h和kfifo.c中,定义了一个先进先出圆形缓冲区实现。如果只有一个读线程、一个写线程,二者没有共享的被修改的控制变量,那么可以证明这种情况下不需要并发控制。kfifo就满足上述条件。kfifo要求缓冲区长度必须为2的幂。读、写指针分别是无符号整型变量。把读写指针变换为缓冲区内的索引值,仅需要“按位与”操作:(指针值 按位与 (缓冲区长度-1))。这避免了计算代价高昂的“求余”操作。且下述关系总是成立:
- 读指针 + 缓冲区存储的数据长度 == 写指针
即使在写指针达到了无符号整型的上界,上溢出后写指针的值小于读指针的值,上述关系仍然保持成立(这是因为无符号整型加法的性质)。 kfifo的写操作,首先计算缓冲区中当前可写入存储空间的数据长度:
- len = min{待写入数据长度, 缓冲区长度 - (写指针 - 读指针)}
然后,分两段写入数据。第一段是从写指针开始向缓冲区末尾方向;第二段是从缓冲区起始处写入余下的可写入数据,这部分可能数据长度为0即并无实际数据写入。 这个就有用到内存屏蔽
内存屏蔽 从技术上来讲,这即是说一个“顺序一致”的模型,其中读和写操作的执行顺序跟它们在源代码中的顺序是完全一样的[8]。
duumy 伪头部,duumy->next才是真正存有数据的head
我们通常访问共享资源会通过互斥访问来确保共享资源的独占或者叫串行访问,比如mutex锁,那锁到底帮我们做了哪些工作来保证正确性的呢?两个点
- 锁内部有内存屏障的指令来解决编译造成的乱序
- 锁实现的原子功能来解决运行时乱序
举一个我们在实际项目中遇到的问题,我们在两个进程之间实现了一个共享内存,用来传递一些消息和命令,为了高性能消息的写和读之前没有加信号量而是用cpu polling来查看消息状态,结果造成读到的消息不是最新的想要的消息。我理解信号量帮我们解决了内存屏障的问题。
http://blog.sina.com.cn/s/blog_694f2ae70101gcmm.html
二 原子操作 cpu指令
对于原子操作,在上一篇的介绍中,原子操作同时是一种锁机制,之所以称之为锁,主要是在读操作中屏蔽内存,也就是获取资源,读取,释放资源的角度,原子操作的确是一种锁机制。但是由于原子操作的颗粒度 可以说是指令集的,在线程级看来是无等待的操作,因为没有任何的进程或者中断对资源进行等待。因此原子操作也可以看作是一种无锁实现方案。
三 顺序锁seqlock 写时自增计数值,读前和读后判断读的计数值是否一致,不一致重新读, 适用于 读多写少
顺序锁跟读写锁很相似,但是顺序锁对于写来说更有利,即使在读操作进行的时候,写操作也是会进行的。Seqlock应用在读很频繁,写操作很少的情况下,而且这个写应该是比较罕见的。比如系统时间,由于系统时间很少进行修改,比较适合使用seqlock。先看一下seq在内核中的实现:
typedef struct {
unsigned sequence;
spinlock_t lock;
} seqlock_t;
static inline void write_seqlock(seqlock_t *sl)
{
spin_lock(&sl->lock);
++sl->sequence;
smp_wmb();
}
static inline void write_sequnlock(seqlock_t *sl)
{
smp_wmb();
sl->sequence++;
spin_unlock(&sl->lock);
}
do {
seq = read_seqbegin(&foo);
...
} while (read_seqretry(&foo, seq));
Seqlock原理就是有一个sequence计数器,当写操作中对计数器加自选锁,并将sequence加1操作。对于读操作,则在开始时读取这个sequence值,在操作完成之后重新读取此值,如果前后值不一致的话,说明中间有写操作,重新操作一遍,直到一致为止。
seqlock的优点可以看出,写操作是立马执行的,而读操作则有可能发生重复的读写。因此seqlock使用主要是在读操作很多但是写操作很少发生的时候使用seqlock。
四 RCU(Read-Copy-Update)
RCU(Read-Copy Update),通过字面意思是读-拷贝修改。主要是什么原理呢。在RCU的使用过程中,读操作不需要等等待,直接进行读操作就可以访问。写操作,首先建立一个副本,修改完毕之后通过一个callback在适当的时机将原来老的指针进行替换掉。通过下图可以对RCU锁进行简单的认识。
RCU根据reader updater 和declaimer可以用如下的图进行理解:
根据使用延时和保护的优先级大体的使用接口如下:
Defer Protect
a. synchronize_rcu() rcu_read_lock() / rcu_read_unlock()
call_rcu() rcu_dereference()
b. call_rcu_bh() rcu_read_lock_bh() / rcu_read_unlock_bh()
rcu_dereference_bh()
c. synchronize_sched() rcu_read_lock_sched() / rcu_read_unlock_sched()
preempt_disable() / preempt_enable()
local_irq_save() / local_irq_restore()
hardirq enter / hardirq exit
NMI enter / NMI exit
rcu_dereference_sched()
- 主要是应用在用户级的数据的保护和更新
- 主要是应用在softirq级的数据保护和更新
- 主要是应用在中断级和调度层的保护和更新
下面举一个kernel documents的例子:
struct foo {
int a;
char b;
long c;
};
DEFINE_SPINLOCK(foo_mutex);
struct foo *gbl_foo;
void foo_update_a(int new_a)
{
struct foo *new_fp;
struct foo *old_fp;
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
spin_lock(&foo_mutex);
old_fp = gbl_foo;
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp); //将old 从队列中删除,更新RCU指针
spin_unlock(&foo_mutex);
synchronize_rcu(); //等待其他的读保护执行完毕
kfree(old_fp); // 释放回收Old占用的资源
}
int foo_get_a(void)
{
int retval;
rcu_read_lock(); // 读保护
retval = rcu_dereference(gbl_foo)->a;
rcu_read_unlock();
return retval;
}
注:RCU是LINUX比较复杂的锁算法,以后会对RCU进行详细的分析。具体的使用可以参考kernel document里面的文件,对于RCU的各种不同的场景应用都有相应的例子作为参考。这里不再赘述。
替换掉旧的数据,并且删掉旧的数据,要注意旧的数据上有没有访问 (弄多个访问计数值,为0才能删除)