RCU介绍

RCU原理:

  

  RCU(Read-Copy Update),顾名思义就是读-拷贝修改,它是基于其原理命名的。对于被RCU保护的共享数据结构,读者不需要获得任何锁就可以访问它,但写者在访问它时首先拷贝一个副本,然后对副本进行修改,最后使用一个回调(callback)机制在适当的时机把指向原来数据的指针重新指向新的被修改的数据。这个时机就是所有引用该数据的CPU都退出对共享数据的操作。

因此RCU实际上是一种改进的rwlock,读者几乎没有什么同步开销,它不需要锁,不使用原子指令,而且在除alpha的所有架构上也不需要内存栅(Memory Barrier),因此不会导致锁竞争,内存延迟以及流水线停滞。不需要锁也使得使用更容易,因为死锁问题就不需要考虑了。写者的同步开销比较大,它需要延迟数据结构的释放,复制被修改的数据结构,它也必须使用某种锁机制同步并行的其它写者的修改操作。读者必须提供一个信号给写者以便写者能够确定数据可以被安全地释放或修改的时机。有一个专门的垃圾收集器来探测读者的信号,一旦所有的读者都已经发送信号告知它们都不在使用被RCU保护的数据结构,垃圾收集器就调用回调函数完成最后的数据释放或修改操作。 RCU与rwlock的不同之处是:它既允许多个读者同时访问被保护的数据,又允许多个读者和多个写者同时访问被保护的数据(注意:是否可以有多个写者并行访问取决于写者之间使用的同步机制),读者没有任何同步开销,而写者的同步开销则取决于使用的写者间同步机制。但RCU不能替代rwlock,因为如果写比较多时,对读者的性能提高不能弥补写者导致的损失。

读者在访问被RCU保护的共享数据期间不能被阻塞,这是RCU机制得以实现的一个基本前提,也就说当读者在引用被RCU保护的共享数据期间,读者所在的CPU不能发生上下文切换,spinlock和rwlock都需要这样的前提。写者在访问被RCU保护的共享数据时不需要和读者竞争任何锁,只有在有多于一个写者的情况下需要获得某种锁以与其他写者同步。写者修改数据前首先拷贝一个被修改元素的副本,然后在副本上进行修改,修改完毕后它向垃圾回收器注册一个回调函数以便在适当的时机执行真正的修改操作。等待适当时机的这一时期称为grace period,而CPU发生了上下文切换称为经历一个quiescent state,grace period就是所有CPU都经历一次quiescent state所需要的等待的时间。垃圾收集器就是在grace period之后调用写者注册的回调函数来完成真正的数据修改或数据释放操作的。

1、http://www.ibm.com/developerworks/cn/linux/l-rcu/

2、http://blog.csdn.net/xabc3000/article/details/15335131

3、http://blog.csdn.net/jianchaolv/article/details/7718578

4、http://blog.csdn.net/ustc_dylan/article/details/4049647

Linux 2.6内核中新的锁机制--RCU

一、 引言

众所周知,为了保护共享数据,需要一些同步机制,如自旋锁(spinlock),读写锁(rwlock),它们使用起来非常简单,而且是一种很有效的同步机制,在UNIX系统和Linux系统中得到了广泛的使用。但是随着计算机硬件的快速发展,获得这种锁的开销相对于CPU的速度在成倍地增加,原因很简单,CPU的速度与访问内存的速度差距越来越大,而这种锁使用了原子操作指令,它需要原子地访问内存,也就说获得锁的开销与访存速度相关,另外在大部分非x86架构上获取锁使用了内存栅(Memory Barrier),这会导致处理器流水线停滞或刷新,因此它的开销相对于CPU速度而言就越来越大。表1数据证明了这一点。

表1 在700MHz奔腾III机器上一些操作的开销

表1是在700MHz的奔腾III机器上的基本操作的开销,在该机器上一个时钟周期能够执行两条整数指令。在1.8GHz的奔腾4机器上, 原子加1指令的开销要比700MHz的奔腾III机器慢75纳秒(ns),尽管CPU速度快两倍多。

这种锁机制的另一个问题在于其可扩展性,在多处理器系统上,可扩展性非常重要,否则根本无法发挥其性能。图1表明了Linux上各种锁的扩展性。

图 1 Linux的4种锁机制的扩展性

图 1  Linux的4种锁机制的扩展性

注:refcnt表示自旋锁与引用记数一起使用。

读写锁rwlock在两个CPU的情况下性能反倒比一个CPU的差,在四个CPU的情况下,refcnt的性能要高于rwlock,refcnt大约是理论性能的45%,而rwlock是理论性能的39%,自旋缩spinlock的性能明显好于refcnt和rwlock,但它也只达到了理性性能的57%,brlock(Big Reader Lock)性能可以线性扩展。Brlock是由Redhat的Ingo Molnar实现的一个高性能的rwlock,它适用于读特多而写特少的情况,读者获得brlock的开销很低,但写者获得锁的开销非常大,而且它只预定义了几个锁,用户无法随便定义并使用这种锁,它也需要为每个CPU定义一个锁状态数组,因此这种锁并没有被作为rwlock的替代方案广泛使用,只是在一些特别的地方使用到。

正是在这种背景下,一个高性能的锁机制RCU呼之欲出,它克服了以上锁的缺点,具有很好的扩展性,但是这种锁机制的使用范围比较窄,它只适用于读多写少的情况,如网络路由表的查询更新、设备状态表的维护、数据结构的延迟释放以及多径I/O设备的维护等。

RCU并不是新的锁机制,它只是对Linux内核而言是新的。早在二十世纪八十年代就有了这种机制,而且在生产系

统中使用了这种机制,但这种早期的实现并不太好,在二十世纪九十年代出现了一个比较高效的实现,而在linux中是在开发内核2.5.43中引入该技术的并正式包含在2.6内核中。

二、RCU的原理

RCU(Read-Copy Update),顾名思义就是读-拷贝修改,它是基于其原理命名的。对于被RCU保护的共享数据结构,读者不需要获得任何锁就可以访问它,但写者在访问它时首先拷贝一个副本,然后对副本进行修改,最后使用一个回调(callback)机制在适当的时机把指向原来数据的指针重新指向新的被修改的数据。这个时机就是所有引用该数据的CPU都退出对共享数据的操作。

因此RCU实际上是一种改进的rwlock,读者几乎没有什么同步开销,它不需要锁,不使用原子指令,而且在除alpha的所有架构上也不需要内存栅(Memory Barrier),因此不会导致锁竞争,内存延迟以及流水线停滞。不需要锁也使得使用更容易,因为死锁问题就不需要考虑了。写者的同步开销比较大,它需要延迟数据结构的释放,复制被修改的数据结构,它也必须使用某种锁机制同步并行的其它写者的修改操作。读者必须提供一个信号给写者以便写者能够确定数据可以被安全地释放或修改的时机。有一个专门的垃圾收集器来探测读者的信号,一旦所有的读者都已经发送信号告知它们都不在使用被RCU保护的数据结构,垃圾收集器就调用回调函数完成最后的数据释放或修改操作。 RCU与rwlock的不同之处是:它既允许多个读者同时访问被保护的数据,又允许多个读者和多个写者同时访问被保护的数据(注意:是否可以有多个写者并行访问取决于写者之间使用的同步机制),读者没有任何同步开销,而写者的同步开销则取决于使用的写者间同步机制。但RCU不能替代rwlock,因为如果写比较多时,对读者的性能提高不能弥补写者导致的损失。

读者在访问被RCU保护的共享数据期间不能被阻塞,这是RCU机制得以实现的一个基本前提,也就说当读者在引用被RCU保护的共享数据期间,读者所在的CPU不能发生上下文切换,spinlock和rwlock都需要这样的前提。写者在访问被RCU保护的共享数据时不需要和读者竞争任何锁,只有在有多于一个写者的情况下需要获得某种锁以与其他写者同步。写者修改数据前首先拷贝一个被修改元素的副本,然后在副本上进行修改,修改完毕后它向垃圾回收器注册一个回调函数以便在适当的时机执行真正的修改操作。等待适当时机的这一时期称为grace period,而CPU发生了上下文切换称为经历一个quiescent state,grace period就是所有CPU都经历一次quiescent state所需要的等待的时间。垃圾收集器就是在grace period之后调用写者注册的回调函数来完成真正的数据修改或数据释放操作的。

以下以链表元素删除为例详细说明这一过程。

写者要从链表中删除元素 B,它首先遍历该链表得到指向元素 B 的指针,然后修改元素 B 的前一个元素的 next 指针指向元素 B 的 next 指针指向的元素C,修改元素 B 的 next 指针指向的元素 C 的 prep 指针指向元素 B 的 prep指针指向的元素 A,在这期间可能有读者访问该链表,修改指针指向的操作是原子的,所以不需要同步,而元素 B 的指针并没有去修改,因为读者可能正在使用 B 元素来得到下一个或前一个元素。写者完成这些操作后注册一个回调函数以便在 grace period 之后删除元素 B,然后就认为已经完成删除操作。垃圾收集器在检测到所有的CPU不在引用该链表后,即所有的 CPU 已经经历了 quiescent state,grace period 已经过去后,就调用刚才写者注册的回调函数删除了元素 B。

图 2 使用 RCU 进行链表删除操作

图 2  使用 RCU 进行链表删除操作

三、RCU 实现机制

按照第二节所讲原理,对于读者,RCU 仅需要抢占失效,因此获得读锁和释放读锁分别定义为:

#define rcu_read_lock()         preempt_disable()
#define rcu_read_unlock()       preempt_enable()

它们有一个变种:

#define rcu_read_lock_bh()      local_bh_disable()
#define rcu_read_unlock_bh()    local_bh_enable()

这个变种只在修改是通过 call_rcu_bh 进行的情况下使用,因为 call_rcu_bh将把 softirq 的执行完毕也认为是一个 quiescent state,因此如果修改是通过 call_rcu_bh 进行的,在进程上下文的读端临界区必须使用这一变种。

每一个 CPU 维护两个数据结构rcu_data,rcu_bh_data,它们用于保存回调函数,函数call_rcu和函数call_rcu_bh用户注册回调函数,前者把回调函数注册到rcu_data,而后者则把回调函数注册到rcu_bh_data,在每一个数据结构上,回调函数被组成一个链表,先注册的排在前头,后注册的排在末尾。

当在CPU上发生进程切换时,函数rcu_qsctr_inc将被调用以标记该CPU已经经历了一个quiescent state。该函数也会被时钟中断触发调用。

时钟中断触发垃圾收集器运行,它会检查:

  1. 否在该CPU上有需要处理的回调函数并且已经经过一个grace period;
  2. 否没有需要处理的回调函数但有注册的回调函数;
  3. 否该CPU已经完成回调函数的处理;
  4. 否该CPU正在等待一个quiescent state的到来;

如果以上四个条件只要有一个满足,它就调用函数rcu_check_callbacks。

函数rcu_check_callbacks首先检查该CPU是否经历了一个quiescent state,如果:

1. 当前进程运行在用户态;

2. 当前进程为idle且当前不处在运行softirq状态,也不处在运行IRQ处理函数的状态;

那么,该CPU已经经历了一个quiescent state,因此通过调用函数rcu_qsctr_inc标记该CPU的数据结构rcu_data和rcu_bh_data的标记字段passed_quiesc,以记录该CPU已经经历一个quiescent state。

否则,如果当前不处在运行softirq状态,那么,只标记该CPU的数据结构rcu_bh_data的标记字段passed_quiesc,以记录该CPU已经经历一个quiescent state。注意,该标记只对rcu_bh_data有效。

然后,函数rcu_check_callbacks将调用tasklet_schedule,它将调度为该CPU设置的tasklet rcu_tasklet,每一个CPU都有一个对应的rcu_tasklet。

在时钟中断返回后,rcu_tasklet将在softirq上下文被运行。

rcu_tasklet将运行函数rcu_process_callbacks,函数rcu_process_callbacks可能做以下事情:

1. 开始一个新的grace period;这通过调用函数rcu_start_batch实现。

2. 运行需要处理的回调函数;这通过调用函数rcu_do_batch实现。

3. 检查该CPU是否经历一个quiescent state;这通过函数rcu_check_quiescent_state实现

如果还没有开始grace period,就调用rcu_start_batch开始新的grace period。调用函数rcu_check_quiescent_state检查该CPU是否经历了一个quiescent state,如果是并且是最后一个经历quiescent state的CPU,那么就结束grace period,并开始新的grace period。如果有完成的grace period,那么就调用rcu_do_batch运行所有需要处理的回调函数。函数rcu_process_callbacks将对该CPU的两个数据结构rcu_data和rcu_bh_data执行上述操作。

四、RCU API

rcu_read_lock()

读者在读取由RCU保护的共享数据时使用该函数标记它进入读端临界区。

rcu_read_unlock()

该函数与rcu_read_lock配对使用,用以标记读者退出读端临界区。夹在这两个函数之间的代码区称为"读端临界区"(read-side critical section)。读端临界区可以嵌套,如图3,临界区2被嵌套在临界区1内。

图 3 嵌套读端临界区示例

图 3  嵌套读端临界区示例

synchronize_rcu()

该函数由RCU写端调用,它将阻塞写者,直到经过grace period后,即所有的读者已经完成读端临界区,写者才可以继续下一步操作。如果有多个RCU写端调用该函数,他们将在一个grace period之后全部被唤醒。注意,该函数在2.6.11及以前的2.6内核版本中为synchronize_kernel,只是在2.6.12才更名为synchronize_rcu,但在2.6.12中也提供了synchronize_kernel和一个新的函数synchronize_sched,因为以前有很多内核开发者使用synchronize_kernel用于等待所有CPU都退出不可抢占区,而在RCU设计时该函数只是用于等待所有CPU都退出读端临界区,它可能会随着RCU实现的修改而发生语意变化,因此为了预先防止这种情况发生,在新的修改中增加了专门的用于其它内核用户的synchronize_sched函数和只用于RCU使用的synchronize_rcu,现在建议非RCU内核代码部分不使用synchronize_kernel而使用synchronize_sched,RCU代码部分则使用synchronize_rcu,synchronize_kernel之所以存在是为了保证代码兼容性。

synchronize_kernel()

其他非RCU的内核代码使用该函数来等待所有CPU处在可抢占状态,目前功能等同于synchronize_rcu,但现在已经不建议使用,而使用synchronize_sched。

synchronize_sched()

该函数用于等待所有CPU都处在可抢占状态,它能保证正在运行的中断处理函数处理完毕,但不能保证正在运行的softirq处理完毕。注意,synchronize_rcu只保证所有CPU都处理完正在运行的读端临界区。 注:在2.6.12内核中,synchronize_kernel和synchronize_sched都实际使用synchronize_rcu,因此当前它们的功能实际是完全等同的,但是将来将可能有大的变化,因此务必根据需求选择恰当的函数。

void fastcall call_rcu(struct rcu_head *head,
                                void (*func)(struct rcu_head *rcu))
struct rcu_head {
        struct rcu_head *next;
        void (*func)(struct rcu_head *head);
};

函数 call_rcu 也由 RCU 写端调用,它不会使写者阻塞,因而可以在中断上下文或 softirq 使用,而 synchronize_rcu、synchronize_kernel 和synchronize_shced 只能在进程上下文使用。该函数将把函数 func 挂接到 RCU回调函数链上,然后立即返回。一旦所有的 CPU 都已经完成端临界区操作,该函数将被调用来释放删除的将绝不在被应用的数据。参数 head 用于记录回调函数 func,一般该结构会作为被 RCU 保护的数据结构的一个字段,以便省去单独为该结构分配内存的操作。需要指出的是,函数 synchronize_rcu 的实现实际上使用函数call_rcu。

void fastcall call_rcu_bh(struct rcu_head *head,
                                void (*func)(struct rcu_head *rcu))

函数call_ruc_bh功能几乎与call_rcu完全相同,唯一差别就是它把softirq的完成也当作经历一个quiescent state,因此如果写端使用了该函数,在进程上下文的读端必须使用rcu_read_lock_bh。

#define rcu_dereference(p)     ({
                  typeof(p) _________p1 = p;
                  smp_read_barrier_depends();
                  (_________p1);
                  })

该宏用于在RCU读端临界区获得一个RCU保护的指针,该指针可以在以后安全地引用,内存栅只在alpha架构上才使用。

除了这些API,RCU还增加了链表操作的RCU版本,因为对于RCU,对共享数据的操作必须保证能够被没有使用同步机制的读者看到,所以内存栅是非常必要的。

static inline void list_add_rcu(struct list_head *new, struct list_head *head) 该函数把链表项new插入到RCU保护的链表head的开头。使用内存栅保证了在引用这个新插入的链表项之前,新链表项的链接指针的修改对所有读者是可见的。

static inline void list_add_tail_rcu(struct list_head *new,
                   struct list_head *head)

该函数类似于list_add_rcu,它将把新的链表项new添加到被RCU保护的链表的末尾。

static inline void list_del_rcu(struct list_head *entry)

该函数从RCU保护的链表中移走指定的链表项entry,并且把entry的prev指针设置为LIST_POISON2,但是并没有把entry的next指针设置为LIST_POISON1,因为该指针可能仍然在被读者用于便利该链表。

static inline void list_replace_rcu(struct list_head *old, struct list_head *new)

该函数是RCU新添加的函数,并不存在非RCU版本。它使用新的链表项new取代旧的链表项old,内存栅保证在引用新的链表项之前,它的链接指针的修正对所有读者可见。

list_for_each_rcu(pos, head)

该宏用于遍历由RCU保护的链表head,只要在读端临界区使用该函数,它就可以安全地和其它_rcu链表操作函数(如list_add_rcu)并发运行。

list_for_each_safe_rcu(pos, n, head)

该宏类似于list_for_each_rcu,但不同之处在于它允许安全地删除当前链表项pos。

list_for_each_entry_rcu(pos, head, member)

该宏类似于list_for_each_rcu,不同之处在于它用于遍历指定类型的数据结构链表,当前链表项pos为一包含struct list_head结构的特定的数据结构。

list_for_each_continue_rcu(pos, head)

该宏用于在退出点之后继续遍历由RCU保护的链表head。

static inline void hlist_del_rcu(struct hlist_node *n)

它从由RCU保护的哈希链表中移走链表项n,并设置n的ppre指针为LIST_POISON2,但并没有设置next为LIST_POISON1,因为该指针可能被读者使用用于遍利链表。

static inline void hlist_add_head_rcu(struct hlist_node *n,
                   struct hlist_head *h)

该函数用于把链表项n插入到被RCU保护的哈希链表的开头,但同时允许读者对该哈希链表的遍历。内存栅确保在引用新链表项之前,它的指针修正对所有读者可见。

hlist_for_each_rcu(pos, head)

该宏用于遍历由RCU保护的哈希链表head,只要在读端临界区使用该函数,它就可以安全地和其它_rcu哈希链表操作函数(如hlist_add_rcu)并发运行。

hlist_for_each_entry_rcu(tpos, pos, head, member)

类似于hlist_for_each_rcu,不同之处在于它用于遍历指定类型的数据结构哈希链表,当前链表项pos为一包含struct list_head结构的特定的数据结构。

五、RCU 典型应用

在 linux 2.6 内核中,RCU 被内核使用的越来越广泛。下面是在最新的 2.6.12内核中搜索得到的RCU使用情况统计表。

表 1 rcu_read_lock 的使用情况统计

表 1  rcu_read_lock 的使用情况统计

表 2 rcu_read_unlock 的使用情况统计

表 2  rcu_read_unlock 的使用情况统计

表 3 rcu_read_lock_bh 的使用情况统计

表 3  rcu_read_lock_bh 的使用情况统计

表 4 rcu_read_unlock_bh 的使用情况统计

表 4  rcu_read_unlock_bh 的使用情况统计

表 5 call_rcu 的使用情况统计

表 5  call_rcu 的使用情况统计

表 6 call_rcu_bh 的使用情况统计

表 6  call_rcu_bh 的使用情况统计

表 7 list API 的使用情况统计

表 7 list API 的使用情况统计

表 8 synchronize_rcu 的使用情况统计

表 8  synchronize_rcu 的使用情况统计

表 9 rcu_dereferance 的使用情况统计

表 9  rcu_dereferance 的使用情况统计

从以上统计结果可以看出,RCU已经在网络驱动层、网络核心层、IPC、dcache、内存设备层、软RAID层、系统调用审计和SELinux中使用。从所有RCU API的使用统计汇总(表 10),不难看出,RCU已经是一个非常重要的内核锁机制。

表 10 所有RCU API使用情况总汇

表 10  所有RCU API使用情况总汇

因此,如何正确使用 RCU 对于内核开发者而言非常重要。

下面部分将就 RCU 的几种典型应用情况详细讲解。

1.只有增加和删除的链表操作

在这种应用情况下,绝大部分是对链表的遍历,即读操作,而很少出现的写操作只有增加或删除链表项,并没有对链表项的修改操作,这种情况使用RCU非常容易,从rwlock转换成RCU非常自然。路由表的维护就是这种情况的典型应用,对路由表的操作,绝大部分是路由表查询,而对路由表的写操作也仅仅是增加或删除,因此使用RCU替换原来的rwlock顺理成章。系统调用审计也是这样的情况。

这是一段使用rwlock的系统调用审计部分的读端代码:

static enum audit_state audit_filter_task(struct task_struct *tsk)
 {
         struct audit_entry *e;
         enum audit_state   state;
         read_lock(&auditsc_lock);
         /* Note: audit_netlink_sem held by caller. */
         list_for_each_entry(e, &audit_tsklist, list) {
                 if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
                         read_unlock(&auditsc_lock);
                         return state;
                 }
         }
         read_unlock(&auditsc_lock);
         return AUDIT_BUILD_CONTEXT;
 }

使用RCU后将变成:

static enum audit_state audit_filter_task(struct task_struct *tsk)
 {
         struct audit_entry *e;
         enum audit_state   state;
         rcu_read_lock();
         /* Note: audit_netlink_sem held by caller. */
         list_for_each_entry_rcu(e, &audit_tsklist, list) {
                 if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
                         rcu_read_unlock();
                         return state;
                 }
         }
         rcu_read_unlock();
         return AUDIT_BUILD_CONTEXT;
 }

这种转换非常直接,使用rcu_read_lock和rcu_read_unlock分别替换read_lock和read_unlock,链表遍历函数使用_rcu版本替换就可以了。

使用rwlock的写端代码:

static inline int audit_del_rule(struct audit_rule *rule,
                                  struct list_head *list)
 {
         struct audit_entry  *e;
         write_lock(&auditsc_lock);
         list_for_each_entry(e, list, list) {
                 if (!audit_compare_rule(rule, &e->rule)) {
                         list_del(&e->list);
                         write_unlock(&auditsc_lock);
                         return 0;
                 }
         }
         write_unlock(&auditsc_lock);
         return -EFAULT;         /* No matching rule */
 }
 static inline int audit_add_rule(struct audit_entry *entry,
                                  struct list_head *list)
 {
         write_lock(&auditsc_lock);
         if (entry->rule.flags & AUDIT_PREPEND) {
                 entry->rule.flags &= ~AUDIT_PREPEND;
                 list_add(&entry->list, list);
         } else {
                 list_add_tail(&entry->list, list);
         }
         write_unlock(&auditsc_lock);
         return 0;
 }

使用RCU后写端代码变成为:

static inline int audit_del_rule(struct audit_rule *rule,
                                  struct list_head *list)
 {
         struct audit_entry  *e;
         /* Do not use the _rcu iterator here, since this is the only
          * deletion routine. */
         list_for_each_entry(e, list, list) {
                 if (!audit_compare_rule(rule, &e->rule)) {
                         list_del_rcu(&e->list);
                         call_rcu(&e->rcu, audit_free_rule, e);
                         return 0;
                 }
         }
         return -EFAULT;         /* No matching rule */
 }
 static inline int audit_add_rule(struct audit_entry *entry,
                                  struct list_head *list)
 {
         if (entry->rule.flags & AUDIT_PREPEND) {
                 entry->rule.flags &= ~AUDIT_PREPEND;
                 list_add_rcu(&entry->list, list);
         } else {
                 list_add_tail_rcu(&entry->list, list);
         }
         return 0;
 }

对于链表删除操作,list_del替换为list_del_rcu和call_rcu,这是因为被删除的链表项可能还在被别的读者引用,所以不能立即删除,必须等到所有读者经历一个quiescent state才可以删除。另外,list_for_each_entry并没有被替换为list_for_each_entry_rcu,这是因为,只有一个写者在做链表删除操作,因此没有必要使用_rcu版本。

通常情况下,write_lock和write_unlock应当分别替换成spin_lock和spin_unlock,但是对于只是对链表进行增加和删除操作而且只有一个写者的写端,在使用了_rcu版本的链表操作API后,rwlock可以完全消除,不需要spinlock来同步读者的访问。对于上面的例子,由于已经有audit_netlink_sem被调用者保持,所以spinlock就没有必要了。

这种情况允许修改结果延后一定时间才可见,而且写者对链表仅仅做增加和删除操作,所以转换成使用RCU非常容易。

2.写端需要对链表条目进行修改操作

如果写者需要对链表条目进行修改,那么就需要首先拷贝要修改的条目,然后修改条目的拷贝,等修改完毕后,再使用条目拷贝取代要修改的条目,要修改条目将被在经历一个grace period后安全删除。

对于系统调用审计代码,并没有这种情况。这里假设有修改的情况,那么使用rwlock的修改代码应当如下:

static inline int audit_upd_rule(struct audit_rule *rule,
                                  struct list_head *list,
                                  __u32 newaction,
                                  __u32 newfield_count)
 {
         struct audit_entry  *e;
         struct audit_newentry *ne;
         write_lock(&auditsc_lock);
         /* Note: audit_netlink_sem held by caller. */
         list_for_each_entry(e, list, list) {
                 if (!audit_compare_rule(rule, &e->rule)) {
                         e->rule.action = newaction;
                         e->rule.file_count = newfield_count;
                         write_unlock(&auditsc_lock);
                         return 0;
                 }
         }
         write_unlock(&auditsc_lock);
         return -EFAULT;         /* No matching rule */
 }

如果使用RCU,修改代码应当为;

static inline int audit_upd_rule(struct audit_rule *rule,
                                   struct list_head *list,
                                   __u32 newaction,
                                   __u32 newfield_count)
  {
          struct audit_entry  *e;
          struct audit_newentry *ne;
          list_for_each_entry(e, list, list) {
                  if (!audit_compare_rule(rule, &e->rule)) {
                          ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
                          if (ne == NULL)
                                  return -ENOMEM;
                          audit_copy_rule(&ne->rule, &e->rule);
                          ne->rule.action = newaction;
                          ne->rule.file_count = newfield_count;
                          list_replace_rcu(e, ne);
                          call_rcu(&e->rcu, audit_free_rule, e);
                          return 0;
                  }
          }
          return -EFAULT;         /* No matching rule */
  }

3.修改操作立即可见

前面两种情况,读者能够容忍修改可以在一段时间后看到,也就说读者在修改后某一时间段内,仍然看到的是原来的数据。在很多情况下,读者不能容忍看到旧的数据,这种情况下,需要使用一些新措施,如System V IPC,它在每一个链表条目中增加了一个deleted字段,标记该字段是否删除,如果删除了,就设置为真,否则设置为假,当代码在遍历链表时,核对每一个条目的deleted字段,如果为真,就认为它是不存在的。

还是以系统调用审计代码为例,如果它不能容忍旧数据,那么,读端代码应该修改为:

static enum audit_state audit_filter_task(struct task_struct *tsk)
 {
         struct audit_entry *e;
         enum audit_state   state;
         rcu_read_lock();
         list_for_each_entry_rcu(e, &audit_tsklist, list) {
                 if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
                         spin_lock(&e->lock);
                         if (e->deleted) {
                                 spin_unlock(&e->lock);
                                 rcu_read_unlock();
                                 return AUDIT_BUILD_CONTEXT;
                         }
                         rcu_read_unlock();
                         return state;
                 }
         }
         rcu_read_unlock();
         return AUDIT_BUILD_CONTEXT;
 }

注意,对于这种情况,每一个链表条目都需要一个spinlock保护,因为删除操作将修改条目的deleted标志。此外,该函数如果搜索到条目,返回时应当保持该条目的锁,因为只有这样,才能看到新的修改的数据,否则,仍然可能看到就的数据。

写端的删除操作将变成:

static inline int audit_del_rule(struct audit_rule *rule,
                                  struct list_head *list)
 {
         struct audit_entry  *e;
         /* Do not use the _rcu iterator here, since this is the only
          * deletion routine. */
         list_for_each_entry(e, list, list) {
                 if (!audit_compare_rule(rule, &e->rule)) {
                         spin_lock(&e->lock);
                         list_del_rcu(&e->list);
                         e->deleted = 1;
                         spin_unlock(&e->lock);
                         call_rcu(&e->rcu, audit_free_rule, e);
                         return 0;
                 }
         }
         return -EFAULT;         /* No matching rule */
 }

删除条目时,需要标记该条目为已删除。这样读者就可以通过该标志立即得知条目是否已经删除。

六、小结

RCU是2.6内核引入的新的锁机制,在绝大部分为读而只有极少部分为写的情况下,它是非常高效的,因此在路由表维护、系统调用审计、SELinux的AVC、dcache和IPC等代码部分中,使用它来取代rwlock来获得更高的性能。但是,它也有缺点,延后的删除或释放将占用一些内存,尤其是对嵌入式系统,这可能是非常昂贵的内存开销。此外,写者的开销比较大,尤其是对于那些无法容忍旧数据的情况以及不只一个写者的情况,写者需要spinlock或其他的锁机制来与其他写者同步。

在作者先前的两篇文章"Linux 实时技术与典型实现分析, 第 1 部分: 介绍"和"Linux 实时技术与典型实现分析, 第 2 部分: Ingo Molnar 的实时补丁"中,Ingo Molnar的实时实现要求RCU读端临界区可抢占,而RCU的实现的前提是读端临界区不可抢占,因此如何解决这一矛盾但同时不损害RCU的性能是RCU未来的一大挑战。

参考资料

[1] Linux RCU实现者之一Paul E. McKenney的RCU资源链接,http://www.rdrop.com/users/paulmck/rclock/

[2] Paul E. McKenney的博士论文,"Exploiting Deferred Destruction: An Analysis of Read-Copy Update Techniques in Operating System Kernels",http://www.rdrop.com/users/paulmck/rclock/RCUdissertation.2004.07.14e1.pdf

[3] Paul E. McKenney's paper in Ottawa Linux Summit 2002, Read-Copy Update,http://www.rdrop.com/users/paulmck/rclock/rcu.2002.07.08.pdf

[4] Linux Journal在2003年10月对RCU的简介, Kernel Korner - Using RCU in the Linux 2.5 Kernel,http://linuxjournal.com/article/6993

[5] Scaling dcache with RCU, http://linuxjournal.com/article/7124

[6] Patch: Real-Time Preemption and RCU,http://lwn.net/Articles/128228/

[7] Using Read-Copy Update Techniques for System V IPC in the Linux 2.5 Kernel, http://www.rdrop.com/users/paulmck/rclock/rcu.FREENIX.2003.06.14.pdf

[8] Linux 2.6.12 kernel source。

[9] Linux kernel documentation, Documentation/RCU/*。

linux内核 RCU机制详解

 简介

        RCU(Read-Copy Update)是数据同步的一种方式,在当前的Linux内核中发挥着重要的作用。RCU主要针对的数据对象是链表,目的是提高遍历读取数据的效率,为了达到目的使用RCU机制读取数据的时候不对链表进行耗时的加锁操作。这样在同一时间可以有多个线程同时读取该链表,并且允许一个线程对链表进行修改(修改的时候,需要加锁)。RCU适用于需要频繁的读取数据,而相应修改数据并不多的情景,例如在文件系统中,经常需要查找定位目录,而对目录的修改相对来说并不多,这就是RCU发挥作用的最佳场景。

       Linux内核源码当中,关于RCU的文档比较齐全,你可以在 /Documentation/RCU/ 目录下找到这些文件。Paul E. McKenney 是内核中RCU源码的主要实现者,他也写了很多RCU方面的文章。他把这些文章和一些关于RCU的论文的链接整理到了一起。http://www2.rdrop.com/users/paulmck/RCU/

       在RCU的实现过程中,我们主要解决以下问题:

       1,在读取过程中,另外一个线程删除了一个节点。删除线程可以把这个节点从链表中移除,但它不能直接销毁这个节点,必须等到所有的读取线程读取完成以后,才进行销毁操作。RCU中把这个过程称为宽限期(Grace period)。

       2,在读取过程中,另外一个线程插入了一个新节点,而读线程读到了这个节点,那么需要保证读到的这个节点是完整的。这里涉及到了发布-订阅机制(Publish-Subscribe Mechanism)。

       3, 保证读取链表的完整性。新增或者删除一个节点,不至于导致遍历一个链表从中间断开。但是RCU并不保证一定能读到新增的节点或者不读到要被删除的节点。

       宽限期

        通过例子,方便理解这个内容。以下例子修改于Paul的文章。

  1. struct foo {  
  2.            int a;  
  3.            char b;  
  4.            long c;  
  5.  };  
  6.   
  7. DEFINE_SPINLOCK(foo_mutex);  
  8.   
  9. struct foo *gbl_foo;  
  10.   
  11. void foo_read (void)  
  12. {  
  13.      foo *fp = gbl_foo;  
  14.      if ( fp != NULL )  
  15.             dosomething(fp->a, fp->b , fp->c );  
  16. }  
  17.   
  18. void foo_update( foo* new_fp )  
  19. {  
  20.      spin_lock(&foo_mutex);  
  21.      foo *old_fp = gbl_foo;  
  22.      gbl_foo = new_fp;  
  23.      spin_unlock(&foo_mutex);  
  24.      kfee(old_fp);  
  25. }  
  1. struct foo {  
  2.            int a;  
  3.            char b;  
  4.            long c;  
  5.  };  
  6.   
  7. DEFINE_SPINLOCK(foo_mutex);  
  8.   
  9. struct foo *gbl_foo;  
  10.   
  11. void foo_read (void)  
  12. {  
  13.      foo *fp = gbl_foo;  
  14.      if ( fp != NULL )  
  15.             dosomething(fp->a, fp->b , fp->c );  
  16. }  
  17.   
  18. void foo_update( foo* new_fp )  
  19. {  
  20.      spin_lock(&foo_mutex);  
  21.      foo *old_fp = gbl_foo;  
  22.      gbl_foo = new_fp;  
  23.      spin_unlock(&foo_mutex);  
  24.      kfee(old_fp);  
  25. }  

      如上的程序,是针对于全局变量gbl_foo的操作。假设以下场景。有两个线程同时运行 foo_ read和foo_update的时候,当foo_ read执行完赋值操作后,线程发生切换;此时另一个线程开始执行foo_update并执行完成。当foo_ read运行的进程切换回来后,运行dosomething 的时候,fp已经被删除,这将对系统造成危害。为了防止此类事件的发生,RCU里增加了一个新的概念叫宽限期(Grace period)。如下图所示:

         图中每行代表一个线程,最下面的一行是删除线程,当它执行完删除操作后,线程进入了宽限期。宽限期的意义是,在一个删除动作发生后,它必须等待所有在宽限期开始前已经开始的读线程结束,才可以进行销毁操作。这样做的原因是这些线程有可能读到了要删除的元素。图中的宽限期必须等待1和2结束;而读线程5在宽限期开始前已经结束,不需要考虑;而3,4,6也不需要考虑,因为在宽限期结束后开始后的线程不可能读到已删除的元素。为此RCU机制提供了相应的API来实现这个功能。                             

  1. void foo_read(void)  
  2. {  
  3.     rcu_read_lock();  
  4.     foo *fp = gbl_foo;  
  5.     if ( fp != NULL )  
  6.             dosomething(fp->a,fp->b,fp->c);  
  7.     rcu_read_unlock();  
  8. }  
  9.   
  10. void foo_update( foo* new_fp )  
  11. {  
  12.     spin_lock(&foo_mutex);  
  13.     foo *old_fp = gbl_foo;  
  14.     gbl_foo = new_fp;  
  15.     spin_unlock(&foo_mutex);  
  16.     synchronize_rcu();  
  17.     kfee(old_fp);  
  18. }  
  1. void foo_read(void)  
  2. {  
  3.     rcu_read_lock();  
  4.     foo *fp = gbl_foo;  
  5.     if ( fp != NULL )  
  6.             dosomething(fp->a,fp->b,fp->c);  
  7.     rcu_read_unlock();  
  8. }  
  9.   
  10. void foo_update( foo* new_fp )  
  11. {  
  12.     spin_lock(&foo_mutex);  
  13.     foo *old_fp = gbl_foo;  
  14.     gbl_foo = new_fp;  
  15.     spin_unlock(&foo_mutex);  
  16.     synchronize_rcu();  
  17.     kfee(old_fp);  
  18. }  

      其中foo_read中增加了rcu_read_lock和rcu_read_unlock,这两个函数用来标记一个RCU读过程的开始和结束。其实作用就是帮助检测宽限期是否结束。foo_update增加了一个函数synchronize_rcu(),调用该函数意味着一个宽限期的开始,而直到宽限期结束,该函数才会返回。我们再对比着图看一看,线程1和2,在synchronize_rcu之前可能得到了旧的gbl_foo,也就是foo_update中的old_fp,如果不等它们运行结束,就调用kfee(old_fp),极有可能造成系统崩溃。而3,4,6在synchronize_rcu之后运行,此时它们已经不可能得到old_fp,此次的kfee将不对它们产生影响。

     宽限期是RCU实现中最复杂的部分,原因是在提高读数据性能的同时,删除数据的性能也不能太差。


     订阅——发布机制 

      当前使用的编译器大多会对代码做一定程度的优化,CPU也会对执行指令做一些优化调整,目的是提高代码的执行效率,但这样的优化,有时候会带来不期望的结果。如例:

  1. void foo_update( foo* new_fp )  
  2. {  
  3.     spin_lock(&foo_mutex);  
  4.     foo *old_fp = gbl_foo;  
  5.       
  6.     new_fp->a = 1;  
  7.     new_fp->b = ‘b’;  
  8.     new_fp->c = 100;  
  9.       
  10.     gbl_foo = new_fp;  
  11.     spin_unlock(&foo_mutex);  
  12.     synchronize_rcu();  
  13.     kfee(old_fp);  
  14. }  
  1. void foo_update( foo* new_fp )  
  2. {  
  3.     spin_lock(&foo_mutex);  
  4.     foo *old_fp = gbl_foo;  
  5.       
  6.     new_fp->a = 1;  
  7.     new_fp->b = ‘b’;  
  8.     new_fp->c = 100;  
  9.       
  10.     gbl_foo = new_fp;  
  11.     spin_unlock(&foo_mutex);  
  12.     synchronize_rcu();  
  13.     kfee(old_fp);  
  14. }  

       这段代码中,我们期望的是6,7,8行的代码在第10行代码之前执行。但优化后的代码并不对执行顺序做出保证。在这种情形下,一个读线程很可能读到 new_fp,但new_fp的成员赋值还没执行完成。当读线程执行dosomething(fp->a, fp->b , fp->c ) 的 时候,就有不确定的参数传入到dosomething,极有可能造成不期望的结果,甚至程序崩溃。可以通过优化屏障来解决该问题,RCU机制对优化屏障做了包装,提供了专用的API来解决该问题。这时候,第十行不再是直接的指针赋值,而应该改为 :

       rcu_assign_pointer(gbl_foo,new_fp);

       rcu_assign_pointer的实现比较简单,如下:

      <include/linux/rcupdate.h>

  1. #define rcu_assign_pointer(p, v)    
  2.          __rcu_assign_pointer((p), (v), __rcu)  
  3.   
  4. #define __rcu_assign_pointer(p, v, space)    
  5.          do {   
  6.                  smp_wmb();   
  7.                  (p) = (typeof(*v) __force space *)(v);   
  8.          } while (0)  
  1. #define rcu_assign_pointer(p, v)   
  2.          __rcu_assign_pointer((p), (v), __rcu)  
  3.   
  4. #define __rcu_assign_pointer(p, v, space)   
  5.          do {   
  6.                  smp_wmb();   
  7.                  (p) = (typeof(*v) __force space *)(v);   
  8.          } while (0)  

      我们可以看到它的实现只是在赋值之前加了优化屏障 smp_wmb来确保代码的执行顺序。另外就是宏中用到的__rcu,只是作为编译过程的检测条件来使用的。

      在DEC Alpha CPU机器上还有一种更强悍的优化,如下所示:

  1. void foo_read(void)  
  2. {         
  3.     rcu_read_lock();  
  4.     foo *fp = gbl_foo;  
  5.     if ( fp != NULL )  
  6.         dosomething(fp->a, fp->b ,fp->c);  
  7.     rcu_read_unlock();  
  8. }  
  1. void foo_read(void)  
  2. {         
  3.     rcu_read_lock();  
  4.     foo *fp = gbl_foo;  
  5.     if ( fp != NULL )  
  6.         dosomething(fp->a, fp->b ,fp->c);  
  7.     rcu_read_unlock();  
  8. }  

      第六行的 fp->a,fp->b,fp->c会在第3行还没执行的时候就预先判断运行,当他和foo_update同时运行的时候,可能导致传入dosomething的一部分属于旧的gbl_foo,而另外的属于新的。这样导致运行结果的错误。为了避免该类问题,RCU还是提供了宏来解决该问题:

<include/linux/rcupdate.h>

  1. #define rcu_dereference(p) rcu_dereference_check(p, 0)   
  2.   
  3.   
  4. #define rcu_dereference_check(p, c)    
  5.          __rcu_dereference_check((p), rcu_read_lock_held() || (c), __rcu)  
  6.   
  7. #define __rcu_dereference_check(p, c, space)    
  8.          ({   
  9.                  typeof(*p) *_________p1 = (typeof(*p)*__force )ACCESS_ONCE(p);   
  10.                  rcu_lockdep_assert(c, "suspicious rcu_dereference_check()"   
  11.                                        " usage");   
  12.                  rcu_dereference_sparse(p, space);   
  13.                  smp_read_barrier_depends();   
  14.                  ((typeof(*p) __force __kernel *)(_________p1));   
  15.          })  
  16.   
  17. static inline int rcu_read_lock_held(void)  
  18. {  
  19.          if (!debug_lockdep_rcu_enabled())  
  20.                  return 1;  
  21.          if (rcu_is_cpu_idle())  
  22.                  return 0;  
  23.          if (!rcu_lockdep_current_cpu_online())  
  24.                  return 0;  
  25.          return lock_is_held(&rcu_lock_map);  
  26. }  
  1. #define rcu_dereference(p) rcu_dereference_check(p, 0)  
  2.   
  3.   
  4. #define rcu_dereference_check(p, c)   
  5.          __rcu_dereference_check((p), rcu_read_lock_held() || (c), __rcu)  
  6.   
  7. #define __rcu_dereference_check(p, c, space)   
  8.          ({   
  9.                  typeof(*p) *_________p1 = (typeof(*p)*__force )ACCESS_ONCE(p);   
  10.                  rcu_lockdep_assert(c, "suspicious rcu_dereference_check()"   
  11.                                        " usage");   
  12.                  rcu_dereference_sparse(p, space);   
  13.                  smp_read_barrier_depends();   
  14.                  ((typeof(*p) __force __kernel *)(_________p1));   
  15.          })  
  16.   
  17. static inline int rcu_read_lock_held(void)  
  18. {  
  19.          if (!debug_lockdep_rcu_enabled())  
  20.                  return 1;  
  21.          if (rcu_is_cpu_idle())  
  22.                  return 0;  
  23.          if (!rcu_lockdep_current_cpu_online())  
  24.                  return 0;  
  25.          return lock_is_held(&rcu_lock_map);  
  26. }  

       这段代码中加入了调试信息,去除调试信息,可以是以下的形式(其实这也是旧版本中的代码):

  1. #define rcu_dereference(p)     ({    
  2.                     typeof(p) _________p1 = p;   
  3.                     smp_read_barrier_depends();   
  4.                     (_________p1);   
  5.                     })  
  1. #define rcu_dereference(p)     ({   
  2.                     typeof(p) _________p1 = p;   
  3.                     smp_read_barrier_depends();   
  4.                     (_________p1);   
  5.                     })  

       在赋值后加入优化屏障smp_read_barrier_depends()。

        我们之前的第四行代码改为 foo *fp = rcu_dereference(gbl_foo);,就可以防止上述问题。

       数据读取的完整性

        还是通过例子来说明这个问题:

       

       如图我们在原list中加入一个节点new到A之前,所要做的第一步是将new的指针指向A节点,第二步才是将Head的指针指向new。这样做的目的是当插入操作完成第一步的时候,对于链表的读取并不产生影响,而执行完第二步的时候,读线程如果读到new节点,也可以继续遍历链表。如果把这个过程反过来,第一步head指向new,而这时一个线程读到new,由于new的指针指向的是Null,这样将导致读线程无法读取到A,B等后续节点。从以上过程中,可以看出RCU并不保证读线程读取到new节点。如果该节点对程序产生影响,那么就需要外部调用做相应的调整。如在文件系统中,通过RCU定位后,如果查找不到相应节点,就会进行其它形式的查找,相关内容等分析到文件系统的时候再进行叙述。

      我们再看一下删除一个节点的例子:

     如图我们希望删除B,这时候要做的就是将A的指针指向C,保持B的指针,然后删除程序将进入宽限期检测。由于B的内容并没有变更,读到B的线程仍然可以继续读取B的后续节点。B不能立即销毁,它必须等待宽限期结束后,才能进行相应销毁操作。由于A的节点已经指向了C,当宽限期开始之后所有的后续读操作通过A找到的是C,而B已经隐藏了,后续的读线程都不会读到它。这样就确保宽限期过后,删除B并不对系统造成影响。

     小结

       RCU的原理并不复杂,应用也很简单。但代码的实现确并不是那么容易,难点都集中在了宽限期的检测上,后续分析源代码的时候,我们可以看到一些极富技巧的实现方式。

Linux内核RCU机制的实现

一:前言
RCU机制出现的比较早,只是在linux kernel中一直到2.5版本的时候才被采用.关于RCU机制,这里就不做过多的介绍了,网上有很多有关RCU介绍和使用的文档.请自行查阅.本文主要是从linux kernel源代码的角度.来分析RCU的实现.
在讨论RCU的实现之前.有必要重申以下几点:
1:RCU使用在读者多而写者少的情况.RCU和读写锁相似.但RCU的读者占锁没有任何的系统开销.写者与写写者之间必须要保持同步,且写者必须要等它之前的读者全部都退出之后才能释放之前的资源.
2:RCU保护的是指针.这一点尤其重要.因为指针赋值是一条单指令.也就是说是一个原子操作.因它更改指针指向没必要考虑它的同步.只需要考虑cache的影响.
3:读者是可以嵌套的.也就是说rcu_read_lock()可以嵌套调用.
4:读者在持有rcu_read_lock()的时候,不能发生进程上下文切换.否则,因为写者需要要等待读者完成,写者进程也会一直被阻塞.
以下的代码是基于linux kernel 2.6.26
二:使用RCU的实例
Linux kernel中自己附带有详细的文档来介绍RCU,这些文档位于linux-2.6.26.3/Documentation/RCU. 这些文档值得多花点时间去仔细研读一下.
下面以whatisRCU.txt中的例子作为今天分析的起点:
struct foo {
        int a;
        char b;
        long c;
};
DEFINE_SPINLOCK(foo_mutex);
 
struct foo *gbl_foo;
void foo_update_a(int new_a)
{
        struct foo *new_fp;
        struct foo *old_fp;
 
        new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
        spin_lock(&foo_mutex);
        old_fp = gbl_foo;
        *new_fp = *old_fp;
        new_fp->a = new_a;
        rcu_assign_pointer(gbl_foo, new_fp);
        spin_unlock(&foo_mutex);
        synchronize_rcu();
        kfree(old_fp);
}
 
int foo_get_a(void)
{
        int retval;
 
        rcu_read_lock();
        retval = rcu_dereference(gbl_foo)->a;
        rcu_read_unlock();
        return retval;
}
如上代码所示,RCU被用来保护全局指针struct foo *gbl_foo. foo_get_a()用来从RCU保护的结构中取得gbl_foo的值.而foo_update_a()用来更新被RCU保护的gbl_foo的值.
另外,我们思考一下,为什么要在foo_update_a()中使用自旋锁foo_mutex呢?
假设中间没有使用自旋锁.那foo_update_a()的代码如下:
 
void foo_update_a(int new_a)
{
        struct foo *new_fp;
        struct foo *old_fp;
 
        new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
       
        old_fp = gbl_foo;
        1:-------------------------    
        *new_fp = *old_fp;
        new_fp->a = new_a;
        rcu_assign_pointer(gbl_foo, new_fp);
       
        synchronize_rcu();
        kfree(old_fp);
}
假设A进程在上图----标识处被B进程抢点.B进程也执行了goo_ipdate_a().等B执行完后,再切换回A进程.此时,A进程所持的old_fd实际上已经被B进程给释放掉了.此后A进程对old_fd的操作都是非法的.
 
另外,我们在上面也看到了几个有关RCU的核心API.它们为别是:
rcu_read_lock()
rcu_read_unlock()
synchronize_rcu()
rcu_assign_pointer()
rcu_dereference()
其中,rcu_read_lock()和rcu_read_unlock()用来保持一个读者的RCU临界区.在该临界区内不允许发生上下文切换.
rcu_dereference():读者调用它来获得一个被RCU保护的指针.
Rcu_assign_pointer():写者使用该函数来为被RCU保护的指针分配一个新的值.这样是为了安全从写者到读者更改其值.这个函数会返回一个新值
 
三:RCU API实现分析
Rcu_read_lock()和rcu_read_unlock()的实现如下:
#define rcu_read_lock() __rcu_read_lock()
#define rcu_read_unlock() __rcu_read_unlock()
 
#define __rcu_read_lock()
    do {
        preempt_disable();
        __acquire(RCU);
        rcu_read_acquire();
    } while (0)
#define __rcu_read_unlock()
    do {
        rcu_read_release();
        __release(RCU);
        preempt_enable();
    } while (0)
其中__acquire(),rcu_read_read_acquire(),rcu_read_release(), rcu_read_release()都是一些选择编译函数,可以忽略不可看.因此可以得知.rcu_read_lock(), rcu_read_unlock()只是禁止和启用抢占.因为在读者临界区,不允许发生上下文切换.
 
rcu_dereference()和rcu_assign_pointer()的实现如下:
#define rcu_dereference(p)     ({
                typeof(p) _________p1 = ACCESS_ONCE(p);
                smp_read_barrier_depends();
                (_________p1);
                })
#define rcu_assign_pointer(p, v)
    ({
        if (!__builtin_constant_p(v) ||
            ((v) != NULL))
            smp_wmb();
        (p) = (v);
    })
它们的实现也很简单.因为它们本身都是原子操作.因为只是为了cache一致性,插上了内存屏障.可以让其它的读者/写者可以看到保护指针的最新值.
 
synchronize_rcu()在RCU中是一个最核心的函数,它用来等待之前的读者全部退出.我们后面的大部份分析也是围绕着它而进行.实现如下:
void synchronize_rcu(void)
{
    struct rcu_synchronize rcu;
 
    init_completion(&rcu.completion);
    /* Will wake me after RCU finished */
    call_rcu(&rcu.head, wakeme_after_rcu);
 
    /* Wait for it */
    wait_for_completion(&rcu.completion);
}
我们可以看到,它初始化了一个本地变量,它的类型为struct rcu_synchronize.调用call_rcu()之后.一直等待条件变量rcu.competion的满足.
 
在这里看到了RCU的另一个核心API,它就是call_run().它的定义如下:
void call_rcu(struct rcu_head *head,
                void (*func)(struct rcu_head *rcu))
 
它用来等待之前的读者操作完成之后,就会调用函数func.
我们也可以看到,在synchronize_rcu()中,读者操作完了要调用的函数就是wakeme_after_rcu().
另外,call_rcu()用在不可睡眠的条件中,如果中断环境,禁止抢占环境等.而synchronize_rcu()用在可睡眠的环境下.先跟踪看一下wakeme_after_rcu():
static void wakeme_after_rcu(struct rcu_head  *head)
{
    struct rcu_synchronize *rcu;
 
    rcu = container_of(head, struct rcu_synchronize, head);
    complete(&rcu->completion);
}
我们可以看到,该函数将条件变量置真,然后唤醒了在条件变量上等待的进程.
 
看下call_rcu():
void call_rcu(struct rcu_head *head,
                void (*func)(struct rcu_head *rcu))
{
    unsigned long flags;
    struct rcu_data *rdp;
 
    head->func = func;
    head->next = NULL;
    local_irq_save(flags);
    rdp = &__get_cpu_var(rcu_data);
    *rdp->nxttail = head;
    rdp->nxttail = &head->next;
    if (unlikely(++rdp->qlen > qhimark)) {
        rdp->blimit = INT_MAX;
        force_quiescent_state(rdp, &rcu_ctrlblk);
    }
    local_irq_restore(flags);
}
该函数也很简单,就是将head加在了per_cpu变量rcu_data的tail链表上.
Rcu_data定义如下:
DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
由此,我们可以得知,每一个CPU都有一个rcu_data.每个调用call_rcu()/synchronize_rcu()进程所代表的head都会挂到rcu_data的tail链表上.
 
那究竟怎么去判断当前的写(应该是读者,作者可能是笔误了)者已经操作完了呢?
我们在之前看到,不是读者在调用rcu_read_lock()的时候要禁止抢占么?因此,我们只需要判断如有的CPU都进过了一次上下文切换,就说明所有读者已经退出了.
引用<< Linux 2.6内核中新的锁机制--RCU >>( (http://www.ibm.com/developerworks/cn/linux/l-rcu/)中有关这个过程的描述:
“等待适当时机的这一时期称为grace period,而CPU发生了上下文切换称为经历一个quiescent state,grace period就是所有CPU都经历一次quiescent state所需要的等待的时间。垃圾收集器就是在grace period之后调用写者注册的回调函数来完成真正的数据修改或数据释放操作的”
 
要彻底弄清楚这个问题,我们得从RCU的初始化说起.
四:从RCU的初始化说起
RCU的初始化位于start_kernel()àrcu_init().代码如下:
void __init rcu_init(void)
{
    __rcu_init();
}
 
void __init __rcu_init(void)
{
    rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
            (void *)(long)smp_processor_id());
    /* Register notifier for non-boot CPUs */
    register_cpu_notifier(&rcu_nb);
}
Reqister_cpu_notifier()是关于通知链表的操作,可以忽略不看.
跟进rcu_cpu_notify():
static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
                unsigned long action, void *hcpu)
{
    long cpu = (long)hcpu;
 
    switch (action) {
    case CPU_UP_PREPARE:
    case CPU_UP_PREPARE_FROZEN:
        rcu_online_cpu(cpu);
        break;
    case CPU_DEAD:
    case CPU_DEAD_FROZEN:
        rcu_offline_cpu(cpu);
        break;
    default:
        break;
    }
    return NOTIFY_OK;
}
注意到,在__rcu_init()中是以CPU_UP_PREPARE为参数调用此函数,对应流程转入rcu_online_cpu中:
static void __cpuinit rcu_online_cpu(int cpu)
{
    struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
    struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
 
    rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
    rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
    open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
}
我们从这里又看到了另一个per_cpu变量,rcu_bh_data.有关bh的部份之后再来分析.在这里略过这些部份.
Rcu_init_percpu_data()如下:
static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
                        struct rcu_data *rdp)
{
    memset(rdp, 0, sizeof(*rdp));
    rdp->curtail = &rdp->curlist;
    rdp->nxttail = &rdp->nxtlist;
    rdp->donetail = &rdp->donelist;
    rdp->quiescbatch = rcp->completed;
    rdp->qs_pending = 0;
    rdp->cpu = cpu;
    rdp->blimit = blimit;
}
调用这个函数的第二个参数是一个全局变量rcu_ctlblk.定义如下:
static struct rcu_ctrlblk rcu_ctrlblk = {
    .cur = -300,
    .completed = -300,
    .lock = __SPIN_LOCK_UNLOCKED(&rcu_ctrlblk.lock),
    .cpumask = CPU_MASK_NONE,
};
static struct rcu_ctrlblk rcu_bh_ctrlblk = {
    .cur = -300,
    .completed = -300,
    .lock = __SPIN_LOCK_UNLOCKED(&rcu_bh_ctrlblk.lock),
    .cpumask = CPU_MASK_NONE,
};
在rcu_init_percpu_data中,初始化了三个链表,分别是taillist,curlist和donelist.另外, 将rdp->quiescbatch 赋值为 rcp->completed.这个是一个很重要的操作.
Rdp-> quiescbatch表示rcu_data已经完成的grace period序号(在代码中也被称为了batch),rcp->completed表示全部(应该是全局变量,作者笔误)变量  rcu_ctrlblk 计数已经完成的grace period序号.将rdp->quiescbatch = rcp->completed;,表示不需要等待grace period.
 
回到rcu_online_cpu()中:
open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
初始化了RCU_SOFTIRQ类型的软中断.但这个软中断什么时候被打开,还需要之后来分析.
之后,每个CPU的初始化都会经过start_kernel()->rcu_init().相应的,也为每个CPU初始化了RCU的相关结构.
 
五:等待RCU读者操作完成
之前,我们看完了RCU的初始化,现在可以来看一下RCU如何来判断当前的RCU读者已经退出了.
在每一次进程切换的时候,都会调用rcu_qsctr_inc().如下代码片段如示:
asmlinkage void __sched schedule(void)
{
    ......
    ......
    rcu_qsctr_inc(cpu);
    ......
}
Rcu_qsctr_inc()代码如下:
static inline void rcu_qsctr_inc(int cpu)
{
    struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
    rdp->passed_quiesc = 1;
}
该函数将对应CPU上的rcu_data的passed_quiesc成员设为了1.
或许你已经发现了,这个过程就标识该CPU经过了一次quiescent state.没错:-)
 
另外,在时钟中断中,会进行以下操作:
void update_process_times(int user_tick)
{
    ......
    ......
 
    if (rcu_pending(cpu))
        rcu_check_callbacks(cpu, user_tick);
    ......
    ......
}
在每一次时钟中断,都会检查是否有需要更新的RCU需要处理,如果有,就会为其调用rcu_check_callbacks().
 
Rcu_pending()的代码如下:
int rcu_pending(int cpu)
{
    return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
        __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
}
同上面一样,忽略bh的部份.
static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
{
    /* This cpu has pending rcu entries and the grace period
     * for them has completed.
     */
    if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
        return 1;
 
    /* This cpu has no pending entries, but there are new entries */
    if (!rdp->curlist && rdp->nxtlist)
        return 1;
 
    /* This cpu has finished callbacks to invoke */
    if (rdp->donelist)
        return 1;
 
    /* The rcu core waits for a quiescent state from the cpu */
    if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
        return 1;
 
    /* nothing to do */
    return 0;
}
上面有四种情况会返回1,分别对应:
1:该CPU上有等待处理的回调函数,且已经经过了一个batch(grace period).rdp->datch表示rdp在等待的batch序号
2:上一个等待已经处理完了,又有了新注册的回调函数.
3:等待已经完成,但尚末调用该次等待的回调函数.
4:在等待quiescent state.
关于rcp和rdp结构中成员的含义,我们等用到的时候再来分析.
 
如果rcu_pending返回1,就会进入到rcu_check_callbacks().代码如下:
void rcu_check_callbacks(int cpu, int user)
{
    if (user ||
        (idle_cpu(cpu) && !in_softirq() &&
                hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
        rcu_qsctr_inc(cpu);
        rcu_bh_qsctr_inc(cpu);
    } else if (!in_softirq())
        rcu_bh_qsctr_inc(cpu);
    raise_rcu_softirq();
}
如果已经CPU中运行的进程是用户空间进程或者是CPU空闲且不处于中断环境,那么,它也已经进过了一次切换.注意,RCU只能在内核空间使用.
最后调用raise_rcu_softirq()打开了软中断处理.相应的,也就调用RCU的软中断处理函数.结合上面分析的初始化流程,软中断的处理函数为rcu_process_callbacks().
代码如下:
static void rcu_process_callbacks(struct softirq_action *unused)
{
    __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
    __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
}
 
在阅读__rcu_process_callbacks()之前,先来了解一下rdp中几个链表的含义:
每次新注册的回调函数,都会链入到rdp->taillist.
当前等待grace period完成的函数都会链入到rdp->curlist上.
到等待的grace period已经到来,就会将curlist上的链表移到donelist上.
当一个grace period过了之后,就会将taillist上的数据移到rdp->curlist上.之后加册的回调函数又会将其加到rdp->taillist上.
 
__rcu_process_callbacks()代码分段分析如下:
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
                    struct rcu_data *rdp)
{
    if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
        *rdp->donetail = rdp->curlist;
        rdp->donetail = rdp->curtail;
        rdp->curlist = NULL;
        rdp->curtail = &rdp->curlist;
    }
 
    如果有需要处理的回调函数,且已经经过了一次grace period.就将curlist上的数据移到donetlist上.
其中,crp->completed表示已经完成的grace period.rdp->batch表示该CPU正在等待的grace period序号.
 
    if (rdp->nxtlist && !rdp->curlist) {
        local_irq_disable();
        rdp->curlist = rdp->nxtlist;
        rdp->curtail = rdp->nxttail;
        rdp->nxtlist = NULL;
        rdp->nxttail = &rdp->nxtlist;
        local_irq_enable();
 
        /*
         * start the next batch of callbacks
         */
 
        /* determine batch number */
        rdp->batch = rcp->cur + 1;
        /* see the comment and corresponding wmb() in
         * the rcu_start_batch()
         */
        smp_rmb();
 
        if (!rcp->next_pending) {
            /* and start it/schedule start if it's a new batch */
            spin_lock(&rcp->lock);
            rcp->next_pending = 1;
            rcu_start_batch(rcp);
            spin_unlock(&rcp->lock);
        }
    }
如果上一个等待的回调函数处理完了,而且又有了新注册的回调函数.就将taillist上的数据移动到curlist上.并开启新的grace period等待.
注意里面几个变量的赋值:
rdp->batch = rcp->cur + 1表示该CPU等待的grace period置为当前已发生grace period序号的下一个.
每次启动一个新的grace period等待之后,就会将rcp->next_pending.在启动的过程中,也就是rcu_start_batch()的过程中,会将rcp->next_pending置为1.设置这个变量主要是防止多个写者竞争的情况
 
    //更新相关信息
    rcu_check_quiescent_state(rcp, rdp);
    //处理等待完成的回调函数
    if (rdp->donelist)
        rcu_do_batch(rdp);
}
接着,更新相关的信息,例如,判断当前CPU是否进行了quiescent state.或者grace period是否已经完成.
最后再处理挂在rdp->donelist上的链表.
 
这里面有几个子函数值得好好分析,分别分析如下:
第一个要分析的是rcu_start_batch():
static void rcu_start_batch(struct rcu_ctrlblk *rcp)
{
    if (rcp->next_pending &&
            rcp->completed == rcp->cur) {
        rcp->next_pending = 0;
        smp_wmb();
        rcp->cur++;
        smp_mb();
        cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
 
        rcp->signaled = 0;
    }
}
这个函数的代码虽然很简单,但隐藏了很多玄机.
每次启动一个新的grace period等待的时候就将rcp->cur加1,将rcp->cpumask中,将存在的CPU的位置1.
其中,if判断必须要满足二个条件:
第一:rcp->next_pending必须为1.我们把这个函数放到__rcu_process_callbacks()这个大环境中看一下:
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
                    struct rcu_data *rdp)
{
    ......
    ......
    if (rdp->nxtlist && !rdp->curlist) {
        ......
        if (!rcp->next_pending) {
            /* and start it/schedule start if it's a new batch */
            spin_lock(&rcp->lock);
            rcp->next_pending = 1;
            rcu_start_batch(rcp);
            spin_unlock(&rcp->lock);
        }
    }
}
首先,rcp->next_pending为0才会调用rcu_start_batch()启动一个新的进程.然后,将rcp-> next_pending置为1,再调用rcu_start_batch().在这里要注意中间的自旋锁.然后在rcu_start_batch()中, 再次判断rcp->next_pending为1后,再进行后续操作,并将rcp->next_pending置为0.
为什么这里需要这样的判断呢? 如果其它CPU正在开启一个新的grace period等待,那就用不着再次开启一个新的等待了,直接返回即可.
 
第二: rcu_start_batch()中 if要满足的第二个条件为rcp->completed == rcp->cur.也就是说前面的grace period全部都完成了.每次开启新等待的时候都会将rcp->cur加1.每一个等待完成之后,都会将rc-> completed等于rcp->cur.
 
第二个要分析的函数是rcu_check_quiescent_state().代码如下:
static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
                    struct rcu_data *rdp)
{
    if (rdp->quiescbatch != rcp->cur) {
        /* start new grace period: */
        rdp->qs_pending = 1;
        rdp->passed_quiesc = 0;
        rdp->quiescbatch = rcp->cur;
        return;
    }
 
    /* Grace period already completed for this cpu?
     * qs_pending is checked instead of the actual bitmap to avoid
     * cacheline trashing.
     */
    if (!rdp->qs_pending)
        return;
 
    /*
     * Was there a quiescent state since the beginning of the grace
     * period? If no, then exit and wait for the next call.
     */
    if (!rdp->passed_quiesc)
        return;
    rdp->qs_pending = 0;
 
    spin_lock(&rcp->lock);
    /*
     * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
     * during cpu startup. Ignore the quiescent state.
     */
    if (likely(rdp->quiescbatch == rcp->cur))
        cpu_quiet(rdp->cpu, rcp);
 
    spin_unlock(&rcp->lock);
}
首先,如果rdp->quiescbatch != rcp->cur.则说明又开启了一个新的等待,因此需要重新处理这个等待,首先将rdp->quiescbatch 更新为rcp->cur.然后,使rdp->qs_pending为1.表示有等待需要处理. passed_quiesc也被清成了0.
然后,再判断rdp->passed_quiesc是否为真,记得我们在之前分析过,在每次进程切换或者进程切换的时候,都会调用rcu_qsctr_inc().该函数会将rdp->passed_quiesc置为1.
因此,在这里判断这个值是为了检测该CPU上是否发生了上下文切换.
之后,就是一段被rcp->lock保护的一段区域.如果还是等待没有发生改变,就会调用cpu_quiet(rdp->cpu, rcp)将该CPU位清零.如果是一个新的等待了,就用不着清了,因为需要重新判断该CPU上是否发生了上下文切换.
 
cpu_quiet()函数代码如下:
static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
{
    cpu_clear(cpu, rcp->cpumask);
    if (cpus_empty(rcp->cpumask)) {
        /* batch completed ! */
        rcp->completed = rcp->cur;
        rcu_start_batch(rcp);
    }
}
它清除当前CPU对应的位,如果CPMMASK为空,对应所有的CPU都发生了进程切换,就会将rcp->completed = rcp->cur.并且根据需要是否开始一个grace period等待.
 
最后一个要分析的函数是rcu_do_batch().它进行的是清尾的工作.如果等待完成了,那就必须要处理donelist链表上挂载的数据了.代码如下:
static void rcu_do_batch(struct rcu_data *rdp)
{
    struct rcu_head *next, *list;
    int count = 0;
 
    list = rdp->donelist;
    while (list) {
        next = list->next;
        prefetch(next);
        list->func(list);
        list = next;
        if (++count >= rdp->blimit)
            break;
    }
    rdp->donelist = list;
 
    local_irq_disable();
    rdp->qlen -= count;
    local_irq_enable();
    if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
        rdp->blimit = blimit;
 
    if (!rdp->donelist)
        rdp->donetail = &rdp->donelist;
    else
        raise_rcu_softirq();
}
它遍历处理挂在链表上的回调函数.在这里,注意每次调用的回调函数有最大值限制.这样做主要是防止一次调用过多的回调函数而产生不必要系统负载.如果donelist中还有没处理完的数据,打开RCU软中断,在下次软中断到来的时候接着处理.
 
五:几种RCU情况分析
1:如果CPU 1上有进程调用rcu_read_lock进入临界区,之后退出来,发生了进程切换,新进程又通过rcu_read?_lock进入临界区.由于RCU软中断中只判断一次上下文切换,因此,在调用回调函数的时候,仍然有进程处于RCU的读临界区,这样会不会有问题呢?
这样是不会有问题的.还是上面的例子:
        spin_lock(&foo_mutex);
        old_fp = gbl_foo;
        *new_fp = *old_fp;
        new_fp->a = new_a;
        rcu_assign_pointer(gbl_foo, new_fp);
        spin_unlock(&foo_mutex);
        synchronize_rcu();
        kfree(old_fp);
 
使用synchronize_rcu ()只是为了等待持有old_fd(也就是调用rcu_assign_pointer ()更新之前的gbl_foo)的进程退出.而不需要等待所有的读者全部退出.这是因为,在rcu_assign_pointer ()之后的读取取得的保护指针,已经是更新好的新值了.
 
2:上面分析的似乎是针对有挂载链表的CPU而言的,那对于只调用rcu_read_lock()的CPU,它们是怎么处理的呢?
首先,每次启动一次等待,肯定是会更新rcp->cur的.因此,在rcu_pending()的判断中,下面语句会被满足:
    if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
        return 1;
因此会进入到RCU的软中断.在软中断处理中:
rcu_process_callbacks() à __rcu_process_callbacks() àrcu_check_quiescent_state()
中,如果该CPU上有进程切换,就会各新rcp中的CPU 掩码数组.
 
3:如果一个CPU连续调用synchronize_rcu()或者call_rcu()它们会有什么影响呢?
如果当前有请求在等待,就会新请提交的回调函数挂到taillist上,一直到前一个等待完成,再将taillist的数据移到 curlist,并开启一个新的等待,因此,也就是说,在前一个等待期间提交的请求,都会放到一起处理.也就是说,他们会共同等待所有CPU切换完成.
举例说明如下:
假设grace period时间是12ms.在12ms内,先后有A,B,C进程提交请求.
那系统在等待处理完后,交A,B,C移到curlist中,开始一个新的等待.
 
六:有关rcu_read_lock_bh()/rcu_read_unlock_bh()/call_rcu_bh().
在上面的代码分析的时候,经常看到带有bh的RCU代码.现在来看一下这些带bh的RCU是什么样的.
#define rcu_read_lock_bh() __rcu_read_lock_bh()
#define rcu_read_unlock_bh() __rcu_read_unlock_bh()
 
#define __rcu_read_lock_bh()
    do {
        local_bh_disable();
        __acquire(RCU_BH);
        rcu_read_acquire();
    } while (0)
#define __rcu_read_unlock_bh()
    do {
        rcu_read_release();
        __release(RCU_BH);
        local_bh_enable();
    } while (0)
根据上面的分析:bh RCU跟普通的RCU相比不同的是,普通RCU是禁止内核抢占,而bh RCU是禁止下半部.
其实,带bh的RCU一般在软中断使用,不过计算quiescent state并不是发生一次上下文切换.而是发生一次softirq.我们在后面的分析中可得到印证.
 
Call_rcu_bh()代码如下:
void call_rcu_bh(struct rcu_head *head,
                void (*func)(struct rcu_head *rcu))
{
    unsigned long flags;
    struct rcu_data *rdp;
 
    head->func = func;
    head->next = NULL;
    local_irq_save(flags);
    rdp = &__get_cpu_var(rcu_bh_data);
    *rdp->nxttail = head;
    rdp->nxttail = &head->next;
 
    if (unlikely(++rdp->qlen > qhimark)) {
        rdp->blimit = INT_MAX;
        force_quiescent_state(rdp, &rcu_bh_ctrlblk);
    }
 
    local_irq_restore(flags);
}
它跟call_rcu()不相同的是,rcu是取per_cpu变量rcu__data和全局变量rcu_ctrlblk.而bh RCU是取rcu_bh_data,rcu_bh_ctrlblk.他们的类型都是一样的,这样做只是为了区分BH和普通RCU的等待.
 
对于rcu_bh_qsctr_inc
static inline void rcu_bh_qsctr_inc(int cpu)
{
    struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
    rdp->passed_quiesc = 1;
}
它跟rcu_qsctr_inc()机同,也是更改对应成员.
 
所不同的是,调用rcu_bh_qsctr_inc()的地方发生了变化.
asmlinkage void __do_softirq(void)
{
    ......
        do {
        if (pending & 1) {
            h->action(h);
            rcu_bh_qsctr_inc(cpu);
        }
        h++;
        pending >>= 1;
    } while (pending);
    ......
}
也就是说,在发生软中断的时候,才会认为是经过了一次quiescent state.
 
七:小结
基本上,RCU的代码架构不难,难的是对它的临界条件分析.作者的思路也相当缜密.值得花时间仔细研读.
 
 
 
 
 
 

RCU 机制

《RCU是什么?》第一部分

概述

Read-copy update (RCU) 是一种 2002 年 10 月被引入到内核当中的同步机制。通过允许在更新的同时读数据,RCU 提高了同步机制的可伸缩性(scalability)。相对于传统的在并发线程间不区分是读者还是写者的简单互斥性锁机制,或者是哪些允许并发读但同时不 允许写的读写锁,RCU 支持同时一个更新线程和多个读线程的并发。RCU 通过保存对象的多个副本来保障读操作的连续性,并保证在预定的读方临界区没有完成之前不会释放这个对象。RCU定义并使用高效、可伸缩的机制来发布并读取 对象的新版本,并延长旧版本们的寿命。这些机制将工作分发到了读和更新路径上,以保证读路径可以极快地运行。在某些场合(非抢占内核),RCU 的读方没有任何性能负担。

问题1:seqlock 不是也允许读线程和更新线程并发工作么?

这个问题可以归结到 “确切地说,什么是RCU?” 这个问题,或许还是 “RCU 可能是如何工作的?” (再或者,不太可能的情况下,问题会变为什么情况下 RCU 不太可能工作)。本文从几个基本的出发点来回答这些问题;之后还会分批地从使用的角度和 API 的角度来看这些问题。最后一篇连载还会给出一组参考文献。

RCU 由三个基本机制组成,第一个用于插入,第二个用于删除,而第三个则用于让读线程可以承受并发的插入或删除。这三个机制将在下面的三节中介绍,讲述如何将 RCU 转化为链表:

  1. 订阅发布机制 (用于插入)
  2. 等待已有的RCU读者完成 (用于删除)
  3. 维护多个最近更新的对象的版本 (为读者维护)

这三个章节之后还有上重点回顾与快速问题答案。

订阅发布机制

RCU的一个关键特性是它可以安全地扫描数据,即使数据正被同时改写也没问题。要提供这种并发插入的能力,RCU使用了一种订阅发布机制。举例说,考虑一 个被初始化为 NULL 的全局指针变量 gp 将要被修改为新分配并初始化的数据结构。下面这段代码(使用附加的合适的锁机制)可以用于这个目的:

1 struct foo {
2 int a;
3 int b;
4 int c;
5 };
6 struct foo *gp = NULL;
7
8 /* . . . */
9
10 p = kmalloc(sizeof(*p), GFP_KERNEL);
11 p->a = 1;
12 p->b = 2;
13 p->c = 3;
14 gp = p;

不幸的是,没有方法强制保证编译器和CPU能顺序执行最后四条语句。如果gp的赋值早于p的各个域的初始化的话,那么并发的读操作将访问到未初始化的变 量。内存屏障(barrier)可以用于保障操作的顺序,但内存屏障以难以使用而闻名。这样我们将他们封装到具有发布语义的 rcu_assign_pointer() 原语之中。最后的四条将成为这样:

1 p->a = 1;
2 p->b = 2;
3 p->c = 3;
4 rcu_assign_pointer(gp, p);

rcu_assign_pointer() 将会发布新的结构,强制编译器和CPU在给p的各个域赋值之后再把指针赋值给gp。然而,仅仅强制更新操作的顺序是不够的,读者也必须强制使用恰当的顺序。考虑下面的这段代码:

1 p = gp;
2 if (p != NULL) {
3 do_something_with(p->a, p->b, p->c);
4 }

尽管这段代码看起来不会受到顺序错乱的影响,不过十分不幸,DEC Alpha CPU 和投机性编译器优化可能会引发问题,不论你是否相信,这的确有可能会导致 p->a, p->b, p->c 的读取会在读取 p 之前!这种情况在投机性编译器优化的情况中最有可能会出现,编译器会揣测p的值,取出 p->a, p->b 和 p->c,之后取出 p 的真实值来检查拽侧的正确性。这种优化非常激进,或者说疯狂,不过在确实会在profile-driven优化时发生。

毫无疑问,我们需要在CPU和编译器上阻止这种情况的发生。rcu_dereference() 原语使用了必要的内存屏障指令和编译器指令来达到这一目的:

1 rcu_read_lock();
2 p = rcu_dereference(gp);
3 if (p != NULL) {
4 do_something_with(p->a, p->b, p->c);
5 }
6 rcu_read_unlock();

rcu_dereference() 原语可以被看作是订阅了指针指向的值,保证接下来的取值操作将会看到对应的发布操作(rcu_assign_pointer())发生之前被初始化的值。 rcu_read_lock() 和 rcu_read_unlock() 绝对是必须的:他们定义了 RCU 读方临界区的范围。他们的目的将在下一节 解释,不过,他们不会自旋或阻塞,也不阻止 list_add_rcu() 的并发执行。事实上,对于非抢占内核,它们不产生任何代码。

虽然 rcu_assign_pointer() 和 rcu_dereference() 在理论上可以用于构建任意 RCU 保护的数据结构,但实际上,使用高层构造常常更好。因此,rcu_assign_pointer() 和 rcu_dereference() 原语被嵌入到了 Linux 的链表维护 API 中的特殊 RCU 变量之中了。Linux 有两个双向链表的变种,循环链表 struct list_head 和线性链表 struct hlist_head/struct hlist_node。前者的结构如下图所示,绿色的方块表示表头,蓝色的是链表中的元素。

Linux list

将上面的指针发布例子放到链表的场景中来就是这样:

1 struct foo {
2 struct list_head list;
3 int a;
4 int b;
5 int c;
6 };
7 LIST_HEAD(head);
8
9 /* . . . */
10
11 p = kmalloc(sizeof(*p), GFP_KERNEL);
12 p->a = 1;
13 p->b = 2;
14 p->c = 3;
15 list_add_rcu(&p->list, &head);

第15行被使用某种同步机制保护住了,通常是某种所,以组织多个 list_add() 实例并发执行。然而,这些同步不能组织同时发生的RCU读者。订阅一个 RCU 保护的链表非常直接:

1 rcu_read_lock();
2 list_for_each_entry_rcu(p, head, list) {
3 do_something_with(p->a, p->b, p->c);
4 }
5 rcu_read_unlock();

list_add_rcu() 原语发布一个节点到制定的链表中去,保证对应的 list_for_each_entry_rcu() 调用都正确的订阅到同一个节点上。

问题2:如果在 list_for_each_entry_rcu() 运行时,刚好进行了一次 list_add_rcu(),如何防止 segfault 的发生呢?

Linux 中的另一个双向链表,hlist,是一个线性表,也就是说,它的头部仅需要一个指针,而不是向循环链表一样需要两个指针。这样,使用 hlist 作为大型哈希表的 hash-bucket 数组的容器将仅消耗一半的内存空间。

Linux hlist

将一个新元素添加到一个 RCU 保护的 hlist 里面与添加到循环链表里非常类似:

1 struct foo {
2 struct hlist_node *list;
3 int a;
4 int b;
5 int c;
6 };
7 HLIST_HEAD(head);
8
9 /* . . . */
10
11 p = kmalloc(sizeof(*p), GFP_KERNEL);
12 p->a = 1;
13 p->b = 2;
14 p->c = 3;
15 hlist_add_head_rcu(&p->list, &head);

和上面一样,第15行一定使用了锁或其他某种同步机制。

订阅一个 RCU 保护的 hlist 也和循环链表非常接近。

1 rcu_read_lock();
2 hlist_for_each_entry_rcu(p, q, head, list) {
3 do_something_with(p->a, p->b, p->c);
4 }
5 rcu_read_unlock();

问题3:为什么我们需要传递两个指针给 hlist_for_each_entry_rcu(), list_for_each_entry_rcu() 可是只需要一个指针的啊?

RCU 发布与订阅原语在如下表中列出,同时给出了 “取消发布”或是撤回的原语

类别
发布
撤销
订阅

类别
发布
撤销
订阅

指针
rcu_assign_pointer()
rcu_assign_pointer(…, NULL)
rcu_dereference()

循环链表
list_add_rcu()
list_add_tail_rcu()
list_replace_rcu()
list_del_rcu()
list_for_each_entry_rcu()

双向链表
hlist_add_after_rcu()
hlist_add_before_rcu()
hlist_add_head_rcu()
hlist_replace_rcu()

hlist_del_rcu()

hlist_for_each_entry_rcu()

注意,list_replace_rcu(), list_del_rcu(), hlist_replace_rcu(), 以及 hlist_del_rcu() 增加了一些复杂度。什么时候释放被替换或删除掉的数据元素才是安全的呢?具体地说,我们怎么能知道所有的读者都释放了他们手中对数据元素的引用呢?

这些问题将在下面的章节中得到回答。

等待已经存在的RCU读者完成

RCU的最基本的功能就是等待一些事情的完成。当然,还有很多其他方法也是用于等待事情完成的,包括引用计数、读写锁、事件等。RCU最大的好处在于它可 以等待所有(比如说)两万件不同点事情,而无需显式地跟踪它们中的每一个,也不需要担心性能的下降、可伸缩性限制、复杂度死锁场景,以及内存泄露等所有这 些显式跟踪手法所固有的问题。

RCU 中,被等待的东西被叫做“RCU读方临界区”。一个RCU读方临界区始于 rcu_read_lock() 原语,止于 rcu_read_unlock() 原语。RCU 读方临界区可以嵌套,也可以放入很多代码,只要这些代码显式阻塞或睡眠即可(有一种称为“SRCU”的特殊RCU允许在它的读方临界区中睡眠)。只要你遵守这些约定,你就可以使用RCU来等待任何期望的代码段的完成。

正如其他地方对经典RCU实时RCU的描述,RCU 通过间接确定这些其他事情的完成时间来达到这一目的。

具体地说,如下图所示,RCU是一种等待已经存在的RCU读方临界区结束的方法,包括这些临界区中执行的内存操作。

Grace periods extend to contain pre-existing RCU read-side critical sections.

注意,开始于一个给定宽限期开始之后的RCU读方临界区能够、并可以延续到该宽限期结束之后。

下面的伪码展示了使用RCU等待读者的基本算法形式:

  1. 进行改动,比如,替换链表中的一个元素。
  2. 等待所有已经存在的RCU读方临界区完成(比如,使用synchronize_rcu()原语)。关键点是接下来的RCU读方临界区将无法得到新近删除的元素的引用了。
  3. 清理,比如,释放上述所有被替换的元素。

下面的代码段是从前一节修改而得的,用于说明这一过程,这里面的域a是这个搜索的键值。

1 struct foo {
2 struct list_head list;
3 int a;
4 int b;
5 int c;
6 };
7 LIST_HEAD(head);
8
9 /* . . . */
10
11 p = search(head, key);
12 if (p == NULL) {
13 /* Take appropriate action, unlock, and return. */
14 }
15 q = kmalloc(sizeof(*p), GFP_KERNEL);
16 *q = *p;
17 q->b = 2;
18 q->c = 3;
19 list_replace_rcu(&p->list, &q->list);
20 synchronize_rcu();
21 kfree(p);

第19、20 和 21 行实现了上面所说的三个步骤。第 16-19行展现了 RCU 的名字(读-复制-更新):在允许进行并发读操作的同时,第16行进行了复制,而第17-19行进行了更新。

乍一看会觉得 synchronize_rcu() 原语显得比较神秘。毕竟它必须等所有读方临界区完成,而且,正如我们前面看到的,用于限制RCU读方临界区的rcu_read_lock() 和 rcu_read_unlock() 原语在非抢占内核中甚至什么代码都不会生成。

这里有一个小伎俩,经典RCU通过 rcu_read_lock() 和 rcu_read_unlock() 界定的读方临界区是不允许阻塞和休眠的。因此,当一个给定的CPU要进行上下文切换的时候,我们可以确定任何已有的RCU读方临界区都已经完成了。也就是说,只要每个CPU都至少进行了一次上下文切换,那么所有先前的 RCU 读方临界区也就保证都完成了,即 synchronize_rcu() 可以安全返回了。

因此,经典RCU的 synchronize_rcu() 从概念上说可以被简化成这样:

1 for_each_online_cpu(cpu)
2 run_on(cpu);

这里,run_on() 将当前线程切换到指定 CPU,来强制该 CPU 进行上下文切换。而 for_each_online_cpu() 循环强制对每个 CPU 进行一次上下文切换。虽然这个简单的方法可以在一个不支持抢占的内核上工作,换句话说,对 non-CONFIG_PREEMPT 和 CONFIG_PREEMPT,但对 CONFIG_PREEMPT_RT 实时 (-rt) 内核无效。因此,实时RCU使用了一个(松散地)基于引用计数的方法。

当然,在真实内核中的实现要复杂得多了,因为它需要管理终端,NMI,CPU热插拔和其他实际内核中的可能有的风险,而且还要维护良好的性能和可伸缩性。RCU的实时实现还必须拥有良好的实时响应能力,这就使得(像上面两行那样)直接禁止抢占变得不可能了。

虽然我们了解到了 synchronize_rcu() 的简单实现原理,不过还有很多其它问题呢。比如,RCU读者们在读一个正在被并发地更新的链表的时候究竟读到了什么呢?这个问题将在下一节讲到。

维护多个版本的近期更新的对象

本节将展示 RCU 如何为多个不需要同步的读者维护不同版本的链表。我们使用两个例子来展示一个可能被给定的读者引用的元素必须在该读者处于读方临界区的整个过程中保持完好无损。第一个例子展示了链表元素的删除,而第二个例子则展示了元素的替换。

例1:在删除时维护多个版本

要开始这个“删除”的例子,我们先把上节这个例子的 11-21行改成如下的形式:

1 p = search(head, key);
2 if (p != NULL) {
3 list_del_rcu(&p->list);
4 synchronize_rcu();
5 kfree(p);
6 }

这个链表以及指针p的最初情况是这样的:

Initial list state.

表中每个元素的三元组分别代表域a, b, c。红色的便捷表明读者可以获取它们的指针,而且因为读操作和更新操作不是直接同步的,读者可以在这个删除的过程中同时发生。这里我们为了清晰没有画出双向链表的反向指针。

在第三行的 list_del_rcu() 完成的时候,5,6,7 这个元素已经被从链表中删除了(如下图)。由于读者并不直接和更新操作同步,读者可能同时正在扫描这个链表。由于访问时间不同,这些并发读者可能看到、也 可能没看到新近删除的元素。不过,那些在获取指针之后延迟了读操作的读者(比如因为中断、ECC内存错误,或在 CONFIG_PREEMPT_RT内核中因为抢占而延迟了的)可能仍然会在删除之后的一段时间内看到那个老的链表的版本。下图中 5,6,7 元素的边框仍然是红色的,这意味着仍然有读者可能会引用它。

After deletion.

这里注意,在退出读方临界区之后,读者们就不能再持有 5,6,7 这个元素的引用了。所以,一旦第4行的 synchronize_rcu() 完成了,所有已有读者也就保证都完成了,这样就没有读者会访问这个元素了,下图中,这个元素的边框也变黑了。我们的链表也回到了一个单一的版本了。

After deletion.

这之后,5,6,7 这个元素就可以被安全的释放了:

After deletion.

这里,我们完成了删除 5,6,7 这个元素的操作,下一小节将介绍替换操作。

例2:在替换的过程中维护数据的多个不同版本

在开始替换的例子钱,我们再修改一下前面例子的最后几行:

1 q = kmalloc(sizeof(*p), GFP_KERNEL);
2 *q = *p;
3 q->b = 2;
4 q->c = 3;
5 list_replace_rcu(&p->list, &q->list);
6 synchronize_rcu();
7 kfree(p);

这个链表的初始状态和指针p和删除的那个例子是完全一样的:

Initial list state.

和之前一样,每个元素里面的三元组分别代表域 a, b 和 c。红色的边框代表了读者可能会持有这个元素的引用,因为读者和更新者没有直接的同步,读者可能会和整个替换过程并发进行。再次说明,这里我们为了清晰,再次省略了反向指针。

第一行的 kmalloc() 生成了一个替换元素,如下:

List state after allocation.

第二行把旧的元素的内容拷贝给新的元素:

List state after copy.

第三行,将 q->b 更新为2:

List state after update of b.

第四行,将 q->c 更新为3:

List state after update of c.

现在,第5行进行替换操作,这里,新元素最终对读者可见了。到了这里,如下所示,我们有了这个链表的两个版本。先前已经存在的读者可以看到 5,6,7 元素,而新读者将看到 5,2,3 元素。不过,任何读者都被保证可以看到一个完整的链表。

List state after replacement.

第6行的 synchronize_rcu() 返回后,宽限期将完成,所有在 list_replace_rcu() 之前开始的读者都将完成。具体地说,任何可能持有 5,6,7 的读者都已经退出了他们的读方临界区,这就保证他们不再持有一个引用。因而也在没有任何读者持有老元素的引用了,途中,5,6,7 元素的边框也就变黑了。对于读者来说,目前又只有一个单一的链表版本了,只是新的元素已经替代了旧元素的位置。

List state after grace period.

第七行的 kfree() 完成后,链表旧成为了如下的样子:

List state after grace period.

尽管 RCU 是以替换而命名的,但内核中的大多数使用都是前面小节 中的简单删除的情况。

讨论

这个例子假设在更新操作的过程中保存着一个互斥量,也就是说,这个链表在一个给定时间最多有两种版本。

问题4:如何修改删除的例子,来允许超过两个版本的链表可以同时存在?

问题5:在某一时刻,RCU最多可以有多少个链表的版本?

这组例子显示了RCU使用多个版本来保障在存在并发读者的情况下的安全更改数据。当然,一些算法是无法很好地支持多个版本的。有一个参考文献 介绍了如何对这些算法进行改造以使用RCU,不过,这超出了本文的讨论范围了。

小结

本文介绍了基于RCU的算法的三个基本部分:

  1. 对与添加新数据的发布-订阅机制
  2. 等待已有RCU读者完成,以及
  3. 维护多个版本以便在不顺坏或严重延迟RCU读者的情况下,允许更改。

问题6:如果 rcu_read_lock() 与 rcu_read_unlock() 之间没有自旋锁或阻塞,RCU更新者会怎样延迟RCU读者?

这三个RCU的组成部分允许数据在并发读者访问的同时更新数据,并可以以多种方式实现基于RCU的算法,一些算法将会在接下来的“What is RCU, Really?”系列中继续介绍。

原文地址:https://www.cnblogs.com/alantu2018/p/8468870.html