C++实现singleflight

groupcache 里面实现了一个 singleflight 包,感觉很有用,于是参考它实现了一个C++版本。

参考资料:

实现代码

代码下载:ingleflight的C++实现版本

singleflight.hpp

// solym
// ymwh@foxmail.com
// 2020年3月16日 20点28分

#include <map>
#include <memory>
#include <mutex>
#include <condition_variable>


template<typename _Kty, typename _Ty>
class SingleFlight
{
    // 保存实际执行结果
    struct _Result
    {
        bool _done; // 条件量
        // 这里也可以使用读写锁来实现,会简单一点
        // 第一个操作的线程加写锁,后续的线程加读锁,写完成之后,读锁不再阻塞,即可获取结果
        std::mutex _mtx;    // 条件变量互斥锁
        std::condition_variable _cv;    // 条件变量,用于通知
        // _result 用于保存唯一那个真的执行处理的结果
        // 这里需要考虑 Do 参数 func 函数的实际输入输出参数
        // 不一定是返回值
        _Ty _result;
    };
    // 实际执行的结果保存
    std::mutex _domtx;
    std::map<_Kty, std::shared_ptr<_Result>> _do;

public:
    // 执行操作
    // key  用于区分请求
    // func 实际执行操作的函数
    // args 实际执行操作函数的参数
    template<class F, typename... Args>
    _Ty Do(_Kty key, F&& func, Args&&... args)
    {
        // 先加写锁,后面可能要修改
        std::unique_lock<std::mutex> lock(_domtx);
        // 判断是否已经存在执行结果
        auto iter = _do.find(key);
        // 存在就等待完成
        if (iter != _do.end())
        {
            // 获取实际执行结果结构
            std::shared_ptr<_Result> pRes = iter->second;
            lock.unlock();
            // 等待条件成立(也就是实际执行的那个线程执行完成)
            std::unique_lock<std::mutex> lck(pRes->_mtx);
            pRes->_cv.wait(lck, [pRes]() -> bool { return pRes->_done; });
            // 获取执行结果进行返回
            return pRes->_result;
        }
        // 不存在就创建一个操作结果
        std::shared_ptr<_Result> pRes = std::make_shared<_Result>();
        pRes->_done = false;    // 设置初始条件为 false
        _do[key] = pRes;
        lock.unlock(); // 解锁,别的线程能够继续

        // 执行真正的操作,获取返回结果
        pRes->_result = func(args...);
        {
            std::lock_guard<std::mutex> lck(pRes->_mtx);
            pRes->_done = true;
        }
        // 通知所有线程
        pRes->_cv.notify_all();
        // 移除(也可以放在一个延迟移除队列,进行异步移除,以便后续的
        // 相同key操作也可以直接使用。但在这外面缓存结果会更好)
        lock.lock();
        _do.erase(key);
        return pRes->_result;
    }
};

测试代码 singleflight_test.cpp

// solym
// ymwh@foxmail.com
// 2020年3月16日 20点28分

#include "singleflight.hpp"

#include <string>
#include <thread>
#include <vector>

#include <chrono>
#include <ctime>
#include <iomanip>

#include <iostream>
#include <sstream>


// 获取时间戳
std::string timestamp()
{
    std::time_t now = std::time(nullptr);
    std::ostringstream oss;
    oss << std::put_time(std::localtime(&now), "%Y-%m-%d %H.%M.%S ");
    return oss.str();
}
// 获取当前线程ID
std::string threadid()
{
    std::thread::id this_id = std::this_thread::get_id();
    std::ostringstream oss;
    oss << this_id;
    return oss.str();
}

// 输出简单日志
void outlog(const char* msg)
{
    printf("%s [%8s] %s
",timestamp().c_str(),threadid().c_str(), msg);
}

// 一个简单的测试函数(假设是比较耗时的绘图操作)
void draw(int w, int h, std::vector<uint8_t>& out)
{
    outlog(" draw begin ");
    // 休眠
    std::this_thread::sleep_for(std::chrono::seconds(3));
    for (size_t j = 0; j < 10; ++j)
    {
         out[j] = 'a' + j;
    }
    outlog(" draw ended");
}

int main()
{
    // 创建一个 singlefilght 对象
    // 这里用于接收返回值的是一个 shared_ptr 避免多次拷贝返回结果
    SingleFlight<std::string, std::shared_ptr<std::vector<uint8_t> > > sl;

    // 创建十个线程
    for (size_t i = 0; i < 10; ++i)
    {
        std::thread thr([&sl]() {
            outlog(" Begin");
            // 对 draw 操作进行包装
            // 便于输出需要的结果形式
            std::shared_ptr<std::vector<uint8_t> > out = sl.Do(
              "100", [](int h, int w) -> std::shared_ptr<std::vector<uint8_t> > {
                  std::vector<uint8_t> out(100,0);
                  draw(h, w, out);
                  return std::make_shared<std::vector<uint8_t> >(out);
              },
              100,
              100);
            char buffer[256];
            sprintf(buffer," %p Out: %s",out->data(),(char*)out->data());
            outlog(buffer);
            
            std::cout << std::endl;
            outlog(" Ended: ");
        });
        thr.detach();   // 线程分离执行
    }

    outlog(" Main Begin ");
    // 休眠等待所有线程正常返回
    std::this_thread::sleep_for(std::chrono::seconds(5));

    outlog(" Main Ended ");

    return 0;
}
原文地址:https://www.cnblogs.com/oloroso/p/12506362.html