更为适合并发的极简易队列

上一篇文章里说到了一个极简易队列的实现,然而它对于并发存在一个问题,就是当多个或者说就是两个线程并发地访问队列,分别调用 push() 与 tryPop() 时,可能就会导致数据争用或者死锁。

以下是一种思路,通过分离数据允许并发。其大致思路是预先分配一个不储存任何数据的结点占位,当 push() 进一个新结点时,将 push 的值作为 data 存入当前数据为空的 tail,然后再在 tail 后面构建个新的用于占位的空结点。这样的设计使 push() 操作仅访问 tail,而 tryPop() 虽然还是两个都访问,但是对于 head 的访问极短暂。

template<typename T>
class Queue
{
private:
    struct Node{
        std::shared_ptr<T> data;
        std::unique_ptr<T> next;  
    };
    
    std::unique_ptr<T> head;
    Node*              tail;
public:
    Queue():head(std::make_shared<Node>()), tail(head.get ())
    {}
    Queue(const Queue&)            = delete;
    Queue& operator=(const Queue&) = delete;
    
    std::shared_ptr<T> tryPop()
    {
        if (tail == head.get ()){
            return std::shared_ptr<T>();
        }
        
        auto const result  = head->data();
        auto const oldHead = std::move(head);
        head = std::move(oldHead->next);
        return result;
    }
    
    void push(T newValue)
    {
        auto const tailData = std::make_shared<T>(std::move(newValue));
        tail->data = tailData;
        auto const nextTail = std::make_unique<T>();
        auto const newTail  = nextTail.get();
        tail->next = std::move(nextTail);
        tail = newTail;
    }
};

当然目前还是单线程的,但是这个版本离线程安全的队列又进了一步。

而根据这个就能再进一步地写出相对更高效的并发队列:

template<typename T>
class ThreadsafeQueue
{
private:
    struct Node
    {
        std::shared_ptr<T> data;
        std::unique_ptr<T> next;
    };

    std::unique_ptr<Node> head;
    Node*                 tail;
    mutable std::mutex    headMutex;
    mutable std::mutex    tailMutex;

    Node* getTail() const
    {
        std::lock_guard<std::mutex> lock(tailMutex);
        return tail;
    }

    std::unique_ptr<Node> popHead()
    {
        std::lock_guard<std::mutex> lock(headMutex);
        if (getTail () == head.get ()){
            return nullptr;
        }

        auto const oldHead = std::move(head);
        head = std::move(oldHead->next);
        return oldHead;
    }

public:
    ThreadsafeQueue():
        head(std::make_shared<Node>()), tail(head.get ())
    {}

    ThreadsafeQueue(const ThreadsafeQueue&)            = delete;
    ThreadsafeQueue& operator=(const ThreadsafeQueue&) = delete;

    std::shared_ptr<T> tryPop()
    {
        auto const popedHead = popHead ();
        return (popedHead == nullptr? std::shared_ptr<T>() : popedHead->data;
    }

    void push(T newValue)
    {
        auto newData = std::make_shared<T>(std::move(newValue));
        auto nextTail = std::make_unique<Node>();
        auto newTail  = nextTail.get();
        std::lock_guard<std::mutex> lock(tailMutex);
        tail->data = newData;
        tail->next = std::move(nextTail);
        tail = newTail;
    }
};

这里面最重要的是 tailMutex 因为若没有这个互斥元,则 push() 和 tryPop() 可能会在不同的线程中造成数据竞争或未定义行为。而因为在 push() 和 tryPop() 中都存在 tailMutex 所以即便是在不同的线程里调用这两个函数也能保证 getTail() 要不得到的是真正 tail 的本身或者它的前一个结点。

另外一个重点就是 headMutex,这个的重要性还是用反证法说明,假如我这么写:

std::unique_ptr<Node> popHead()
{
    auto const oldTail = getTail();              // 1
    std::lock_grand<std::mutex> lock(headMutex); // 2
    
    if (head.get() == oldTail)
    {
        return nullptr;
    }      
    auto oldHead = std::move(head);
    head = std::move(oldHead->next);
    return oldHead;
}

这里就可能出现一个问题,就是如果线程 A 在 1 这个地方取得 tail 之后暂停,接着其他的线程 BCDEFG 中把这个 ThreadsafeQueue pop 得连 A 中取得的 tail 都出去了,接下来 A 再执行的时候直接就傻逼了,或者说就算 A 中取得的 tail 没出去,head 也被改变过了,后面的操作也成 UB 了。

 
原文地址:https://www.cnblogs.com/wuOverflow/p/4836236.html