伤害 等待互斥锁

序言:近期读Linux 5.15的发布说明,该版本合并了实时锁机制,当开启配置宏CONFIG_PREEMPT_RT的时候,这些锁被基于实时互斥锁的变体替代:mutex、ww_mutex、rw_semaphore、spinlock和rwlock。第一次听说ww_mutex,在百度上查找的时候发现介绍文档很少,于是自己学习,写成笔记。

在某些场合必须同时持有多个锁,并且获取锁的顺序可能不同,为了避免死锁,应该使用伤害/等待互斥锁(Wound/Wait Mutexes)。获取一个锁集合称为一个事务(transaction),每个事务关联一张门票(ticket),门票也称为序列号,根据门票判断哪个事务年轻。有2种处理死锁的方法,如下。

(1) 等待-死亡(Wait-Die)算法:一个事务申请另一个事务已经获取的锁的时候,如果持有锁的事务年轻,那么申请锁的事务等待(wait);如果持有锁的事务年老,那么申请锁的事务退避并且死亡(die)。

(2) 4.19版本开始支持伤害-等待(Wound-Wait)算法:一个事务申请另一个事务已经获取的锁的时候,如果持有锁的事务年轻,那么申请锁的事务伤害(wound)持有锁的事务,请求它去死亡;如果持有锁的事务年老,那么申请锁的事务等待(wait)。

假设进程1和进程2分别在2个处理器上运行,进程1获取锁A,进程2获取锁B,然后进程1申请锁B,进程2申请锁A。假设进程1的门票编号比进程2的门票编号小,也就是进程1年老,进程2年轻。

两种算法都是公平的,因为其中一个事务最终会成功。和等待-死亡算法相比,伤害-等待算法生成的退避少,但是从一次退避恢复的时候要做更多的工作。伤害-等待算法是一种抢占性的算法(因为事务被其它事务伤害),需要一种可靠的方法来选择受伤状态和抢占正在运行的事务。在伤害-等待算法中,一个事务在受伤后死亡(返回“-EDEADLK”),就认为这个事务被抢占。

如果竞争锁的进程少,并且希望减少回滚的次数,那么应该选择伤害-等待算法。

和普通的互斥锁相比,伤害/等待互斥锁增加了下面2个概念。

(1) 获取上下文(acquire context):一个获取上下文表示一个事务,关联一张门票(ticket),门票也称为序列号,门票编号小表示年老,门票编号大表示年轻。获取上下文跟踪调试状态,捕获对伤害/等待互斥锁接口的错误使用。

(2) 伤害/等待类:初始化获取上下文的时候需要指定锁类,锁类会给获取上下文分配门票。锁类也指定算法:等待-死亡(Wait-Die)或伤害-等待(Wound-Wait)。当多个进程竞争同一个锁集合的时候,它们必须使用相同的锁类。

有3种获取伤害/等待互斥锁的函数,如下。

(1) 普通的获取锁函数ww_mutex_lock(),带有获取上下文。

(2) 进程在回滚(即释放所有已经获取的锁)以后,使用慢路径获取锁函数ww_mutex_lock_slow()获取正在竞争的锁。带有“_slow”后缀的函数不是必需的,因为可以调用函数ww_mutex_lock()获取正在竞争的锁。带有“_slow”后缀的函数的优点是接口安全,如下。

  • 函数ww_mutex_lock()有一个整数返回值,而函数ww_mutex_lock_slow()没有返回值。
  • 当开启调试的时候,函数ww_mutex_lock_slow()检查所有已经获取的锁已经被释放,并且确保进程阻塞在正在竞争的锁上面。

(3) 只获取一个伤害/等待互斥锁,和获取普通的互斥锁完全相同。调用函数ww_mutex_lock(),把获取上下文指定为空指针。

伤害/等待互斥锁的使用方法如下。

(1) 定义一个锁类,锁类在初始化获取上下文的时候需要,锁类也指定算法:等待-死亡(Wait-Die)或伤害-等待(Wound-Wait)。


/* 指定等待-死亡算法 */
static DEFINE_WD_CLASS(my_class);

/* 指定伤害-等待算法 */
static DEFINE_WW_CLASS(my_class);

(2) 初始化一个获取上下文,锁类会给获取上下文分配一张门票。

void ww_acquire_init(struct ww_acquire_ctx *ctx, struct ww_class *ww_class);

(3) 获取锁,返回0表示获取成功,返回“-EDEADLK”表示检测出死锁。

int ww_mutex_lock(struct ww_mutex *lock, struct ww_acquire_ctx *ctx);

(4) 获取需要的所有锁以后,标记获取阶段结束。目前这个函数没有执行任何操作,但是将来可能改变。

void ww_acquire_done(struct ww_acquire_ctx *ctx);

(5) 释放锁。

void ww_mutex_unlock(struct ww_mutex *lock);

(6) 释放所有锁以后,释放获取上下文。

void ww_acquire_fini(struct ww_acquire_ctx *ctx);

下面是一个例子,注意:调用函数ww_mutex_lock()申请锁失败以后,应该先释放已经获取的锁,然后调用慢路径函数ww_mutex_lock_slow()获取正在竞争的锁,最后获取其它锁。重新开始申请锁的时候必须改变申请顺序,因为如果按照原来的顺序申请锁,那么会把刚释放的锁抢回来。


/* 第1步:定义锁类,指定伤害-等待算法。*/
static DEFINE_WW_CLASS(ww_class);

struct obj {
  struct ww_mutex lock;
  /* obj data */
};

struct obj_entry {
  struct list_head head;
  struct obj *obj;
};

int lock_objs(struct list_head *list, struct ww_acquire_ctx *ctx)
{
  struct obj *res_obj = NULL;
  struct obj_entry *contended_entry = NULL;
  struct obj_entry *entry;
  int ret;

  /* 第2步:初始化获取上下文。*/
  ww_acquire_init(ctx, &ww_class);

  /* 第3步:获取锁。*/
retry:
  list_for_each_entry(entry, list, head) {
    if (entry->obj == res_obj) {
      res_obj = NULL;
      continue;
    }

    ret = ww_mutex_lock(&entry->obj->lock, ctx);
    if (ret < 0) {
      contended_entry = entry;
      goto err;
    }
  }

  /* 第4步:标记获取阶段结束。*/
  ww_acquire_done(ctx);
  return 0;

err:
  /* 回滚,释放已经获取的锁。*/
  list_for_each_entry_continue_reverse(entry, list, head) {
    ww_mutex_unlock(&entry->obj->lock);
  }

  if (res_obj) {
    ww_mutex_unlock(&res_obj->lock);
  }

  if (ret == -EDEADLK) {
    /* 使用慢路径获取锁函数获取正在竞争的锁。*/
    ww_mutex_lock_slow(&contended_entry->obj->lock, ctx);
    res_obj = contended_entry->obj;
    /* 获取其它锁。*/
    goto retry;
  }
  ww_acquire_fini(ctx);

  return ret;
}

void unlock_objs(struct list_head *list, struct ww_acquire_ctx *ctx)
{
  struct obj_entry *entry;

  /* 第5步:释放锁。*/
  list_for_each_entry (entry, list, head) {
    ww_mutex_unlock(&entry->obj->lock);
  }

  /* 第6步:释放获取上下文。*/
  ww_acquire_fini(ctx);
}
原文地址:https://www.cnblogs.com/linhaostudy/p/15516962.html