环形无锁队列

环形无锁队列

 

环形无锁队列

1 环形无锁队列的实现

数据结构定义:

template class LockFreeQueue
{
  private:
    ElementT *mArray;
    int mCapacity;
    int mFront;
    int mTail;
}

由于出队操作是在队首进行,入队操作是在队尾进行,因此,我们可以尝试用mFront和mTail来实现多个线程之间的协调。这其中会用到CAS操作:

入队操作伪码:

……

do {
    获取当前的mTail的值:curTailIndex;
    计算新的mTail的值:newTailIndex = (newTailIndex + 1) % size;
} while(!CAS(mTail, curTailIndex, newTailIndex));
插入元素到curTailIndex;

其中的do-while循环实现的是一个忙式等待:线程试图获取当前的队列尾部空间的控制权;一旦获取成功,则向其中插入元素。

但是这样出队的时候就出现了问题:如何判断队首的位置里是否有相应元素呢?仅使用mFront来判断是不行的,这只能保证出队进程不会对同一个索引位置进行出队操作,而不能保证mFront的位置中一定有有效的元素。因此,为了保证出队队列与入队队列之间的协调,需要在LockFreeQueue中添加一个标志数组:

char *mFlagArray;

mFlagArray中的元素标记mArray中与之对应的元素位置是否有效。mFlagArray中的元素有4个取值:

  • 0表示对应的mArray中的槽位为空;
  • 1表示对应槽位已被申请,正在写入;
  • 2表示对应槽位中为有效的元素,可以对其进行出对操作;
  • 3则表示正在弹出操作。

修改后的无锁队列的代码如下:

template class LockFreeQueue
{
  public:
    LockFreeQueue(int s = 0)
    {
        mCapacity = s;
        mFront = 0;
        mTail = 0;
        mSize = 0;
    }

    ~LockFreeQueue() {}

    /**
     * 初始化queue。分配内存,设定size
     * 非线程安全,需在单线程环境下使用
     */
    bool initialize()
    {
        mFlagArray = new char[mCapacity];
        if (NULL == mFlagArray)
            return false;
        memset(mFlagArray, 0, mCapacity);

        mArray = reinterpret_cast(new char[mCapacity * sizeof(ElementT)]);
        if (mArray == NULL)
            return false;
        memset(mArray, 0, mCapacity * sizeof(ElementT));

        return true;
    }

    const int capacity(void) const
    {
        return mCapacity;
    }

    const int size(void) const
    {
        return mSize;
    }

    /**
     * 入队函数,线程安全
     */
    bool push(const ElementT & ele)
    {
        if (mSize >= mCapacity)
            return false;

        int curTailIndex = mTail;

        char *cur_tail_flag_index = mFlagArray + curTailIndex;

        //// 忙式等待
        // while中的原子操作:如果当前tail的标记为已占用(1),则更新cur_tail_flag_index,继续循环;否则,将tail标记设为已经占用
        while (!__sync_bool_compare_and_swap(cur_tail_flag_index, 0, 1))
        {
            curTailIndex = mTail;
            cur_tail_flag_index = mFlagArray +  curTailIndex;
        }

        //// 两个入队线程之间的同步
        int update_tail_index = (curTailIndex + 1) % mCapacity;

        // 如果已经被其他的线程更新过,则不需要更新;
        // 否则,更新为 (curTailIndex+1) % mCapacity;
        __sync_bool_compare_and_swap(&mTail, curTailIndex, update_tail_index);

        // 申请到可用的存储空间
        *(mArray + curTailIndex) = ele;

        // 写入完毕
        __sync_fetch_and_add(cur_tail_flag_index, 1);

        // 更新size;入队线程与出队线程之间的协作
        __sync_fetch_and_add(&mSize, 1);
        return true;
    }

    /**
     * 出队函数,线程安全
     */
    bool pop(ElementT *ele)
    {
        if (mSize <= 0)
            return false;

        int cur_head_index = mFront;
        char *cur_head_flag_index = mFlagArray + cur_head_index;
        while (!__sync_bool_compare_and_swap(cur_head_flag_index, 2, 3))
        {
            cur_head_index = mFront;
            cur_head_flag_index = mFlagArray + cur_head_index;
        }

        // 取模操作可以优化
        int update_head_index = (cur_head_index + 1) % mCapacity;
        __sync_bool_compare_and_swap(&mFront, cur_head_index, update_head_index);
        *ele = *(mArray + cur_head_index);

        // 弹出完毕
        __sync_fetch_and_sub(cur_head_flag_index, 3);

        // 更新size
        __sync_fetch_and_sub(&mSize, 1);

        return true;
    }
  private:
    ElementT *mArray;
    int mCapacity; // 环形数组的大小
    int mSize; //队列中元素的个数
    int mFront;
    int mTail;
    char *mFlagArray; // 标记位,标记某个位置的元素是否被占用
};

2 死锁及饥饿

LockFreeQueue实现了基本的多线程之间的协调,不会存在多个线程同时对同一个资源进行操作的情况,也就不会产生数据竞跑,这保证了对于这个队列而言,基本的访问操作(出队、入队)的执行都是安全的,其结果是可预期的。

在多线程环境下,LockFreeQueue会不会出现死锁的情况呢?死锁有四个必要条件:

  1. 对资源的访问是互斥的;
  2. 请求和保持请求;
  3. 资源不可剥夺;
  4. 循环等待。

在LockFreeQueue中,所有的线程都是对资源进行申请后再使用,一个线程若申请到了资源(这里的资源主要指环形队列中的内存槽位),就会立即使用,并且在使用完后释放掉该资源。不存在一个线程使用A资源的同时去申请B资源的情况,因此并不会出现死锁。

但LockFreeQueue可能出现饥饿状态。例如,对两个出队线程A、B,两者都循环进行出队操作。当队列中有元素时,A总能申请到这个元素并且执行到弹出操作,而B则只能在DeQueue函数的while循环中一直循环下去。

3 一些优化

对LockFreeQueue可以进行一些优化。比如:

  1. 对于环形数组大小,可以设定为2的整数倍,如1024。这样取模的操作即可以简化为与mCapacity-1的按位与操作。
  2. 忙式等待的时候可能会出现某个线程一直占用cpu的情况。此时可以使用sleep(0),让该线程让出CPU时间片,从就绪态转为挂起态。

Date: 2015-12-01T23:33+0800

Author: ruleless

Org version 7.9.3f with Emacs version 24

Validate XHTML 1.0
原文地址:https://www.cnblogs.com/liuyang1012525/p/5011695.html