协程库中 WaitGroup / CountDownLatch 实现

协程库中 WaitGroup / CountDownLatch 实现

这几天读了一些协程的文章。看了看开源协程库,在腾讯的 libco 和魅族的 libgo 中选择了 libgo,结果发现这边提供了协程池,但却没有提供CountDownLatch、SyncClosure之类的设置,要是每次起一组任务都要用一个协程池的话,有点大材小用了。这里我们手动实现一个 CountDownLatch。

先说结论,选用最后一种方案。


由于开始的时候,没有去读过线程池的实现,所以走了一些弯路。

计数器 + co_yield

第一种思路,使用计数器,Done时自减,Wait检查计数器并主动让出CPU。
这种做法使用到了 co_yield 让出CPU,但网上有人提出协程较少的时候,会发生CPU占用100%的问题。
举个例子:进程使用了 4 个线程,现在一个协程在等待8个协程,其中7个已经完成,只有一个还有较长时间才能结束,并且没有其他协程加入到任务队列了。这个时候 Wait 让出CPU之后很快又会被线程调度上CPU,检查到引用计数不满足条件后再次让出CPU,然后又立刻被调度上CPU检查,如此循环直到满足条件结束任务。
问题原因在于,我们这里只是让出了CPU,Wait仍然还是放在thread的ready队列中,而不是放在blocked队列。我们需要一种机制,wait需要的信号,并在信号到来的时候重新进入ready队列。这种机制类似于线程调度中的 【条件变量】 —— 条件不满足则 idle,条件发生时唤醒线程。

class CountDownLatch {
public:
    explicit CountDownLatch(size_t n = 1) : mFlyingCount(n) {
    }
    void Add(size_t n) {
        mFlyingCount += n;
    }
    void Done() {
        mFlyingCount--;
    }
    void Wait() {
        while(mFlyingCount > 0) {
            co_yield;
        }
    }
private:
    std::atomic<size_t> mFlyingCount;

    CountDownLatch(CountDownLatch const &) = delete;
    CountDownLatch(CountDownLatch &&) = delete;
    CountDownLatch& operator=(CountDownLatch const &) = delete;
    CountDownLatch& operator=(CountDownLatch &&) = delete;
};

channel一对一阻塞

第二种思路,最简单但是开销比较大:使用 n 个不带缓冲的 channel 来阻塞 Wait 操作。

class CountDownLatch {
public:
    explicit CountDownLatch(size_t n = 1) : mFlyingCount(n) {
        chans.resize(n, co_chan<int>());
    }
    void Done() {
        chans[--mFlyingCount].Close();
    }
    void Wait() {
        int x;
        for (auto& ch : chans) {
            ch >> x;
        }
    }
private:
    std::atomic<size_t> mFlyingCount;
    std::vector<co_chan<int>> chans;

    CountDownLatch(CountDownLatch const &) = delete;
    CountDownLatch(CountDownLatch &&) = delete;
    CountDownLatch& operator=(CountDownLatch const &) = delete;
    CountDownLatch& operator=(CountDownLatch &&) = delete;
};

一个channel唤醒Wait

看上面的源码我们发现,由于我们是从后往前关闭的channel,等待channel时却是从前往后,因此实际阻塞Wait的只有第一个channel,于是有了进一步简化的方案。
第三种思路,使用不带缓冲的channel作为通信机制,最后一个结束的Done唤醒Wait协程。这种实现起来很简单,依靠的是 channel 来作为条件变量。

class CountDownLatch {
public:
    explicit CountDownLatch(size_t n) : mFlyingCount(n) {}

    void Add(size_t i) {
        mFlyingCount += i;
    }
    void Done() {
        if (--mFlyingCount == 0) {
            ch_0.Close();
        }
    }
    void Wait() {
        int x;
        ch_0 >> x;
    }
private:
    std::atomic<size_t> mFlyingCount {1};
    co_chan<int> ch_0;

    CountDownLatch(CountDownLatch const &) = delete;
    CountDownLatch(CountDownLatch &&) = delete;
    CountDownLatch& operator=(CountDownLatch const &) = delete;
    CountDownLatch& operator=(CountDownLatch &&) = delete;
};

mutex + condition_variable

既然第三种方案提到了条件变量,那么可不可以直接使用条件变量呢?可以的!
第四种实现,剥离无用的上层包装,直接使用协程的条件变量来处理等待条件。这种方案其实脱胎于线程池的 CountDownLatch,只要协程的 mutex 和 condition_variable 实现的没有问题,这个实现就是可以正常工作的。

class CountDownLatch {
public:
    explicit CountDownLatch(size_t n = 1) : mFlyingCount(n) {}

    void Done() {
        std::unique_lock<::co::CoMutex> lck(mu);
        if (--mFlyingCount <= 0) {
            cv.notify_all();
        }
    }
    void Wait() {
        std::unique_lock<::co::CoMutex> lck(mu);
        while (mFlyingCount > 0) {
            cv.wait(lck);
        }
    }
private:
    size_t mFlyingCount;
    ::co::CoMutex mu;
    ::co::ConditionVariableAny cv;

    CountDownLatch(CountDownLatch const &) = delete;
    CountDownLatch(CountDownLatch &&) = delete;
    CountDownLatch& operator=(CountDownLatch const &) = delete;
    CountDownLatch& operator=(CountDownLatch &&) = delete;
};

这里有几个需要注意的点:

  • 第一个是锁,操作 condition_variable 是需要和锁搭配使用的,不能单独使用 condition_variable;
  • 第二是 unique_lock,C++11 thread库 提供了两种锁专用的智能指针,lock_guard 和 unique_lock,区别是前者会一直持有锁,后者可以在wait的时候解锁再加锁;
  • 第三,不仅cv需要锁保护,计数器操作也需要加锁,计数器和cv放到一起才是一个完整的临界资源,而不是单独来看的。
原文地址:https://www.cnblogs.com/zhcpku/p/15223847.html