(十五)线程池的实现

1. 轮询方式

#include <iostream>
#include <string>

#include <memory>
#include <vector>
#include <mutex>
#include <queue>
#include <thread>
#include <atomic>

// interface
class Runnable{
public:
    virtual void Run() = 0;
};

// thread pool
class FixedThreadPool{
public:
    FixedThreadPool(int size) :threadPoolSize(size){}

    ~FixedThreadPool(){
        if (running){
            Stop();
        }
    }

    void Start(){
        running = true;
        for (int i = 0; i < threadPoolSize; ++i){
            std::shared_ptr<std::thread> t = std::make_shared<std::thread>(&FixedThreadPool::Run,this);
            std::lock_guard<std::mutex> lk(txThread);
            threads.push_back(t);
        }
    }

    void Stop(){
        bool flag = true;
        while (flag){
            std::this_thread::sleep_for(std::chrono::seconds(1));
            std::lock_guard<std::mutex> lk(txTasks);
            flag = !tasks.empty();
        }

        running = false;
        for (auto thread : threads){
            thread->join();
        }
    }

    void Submit(std::shared_ptr<Runnable> r){
        std::lock_guard<std::mutex> lk(txTasks);
        tasks.push(r);
    }

private:
    void Run(){
        while (running){
            std::shared_ptr<Runnable> task;
            {
                std::lock_guard<std::mutex> lk(txTasks);
                if (tasks.empty()){
                    continue;
                }
                task = tasks.front();
                tasks.pop();
            }
            task->Run();
        }
    }

private:
    int                                        threadPoolSize;
    std::vector<std::shared_ptr<std::thread>>  threads;
    std::mutex                                 txThread;
    std::queue<std::shared_ptr<Runnable>>      tasks;
    std::mutex                                 txTasks;
    std::atomic_bool                           running;
};


// concrete runnable
class Analyzer:public Runnable{
public:
    Analyzer() = default;
    void Run(){
        std::cout << "analyze is running, cnt is " << cnt++ << std::endl;
    }
private:
    static std::atomic_int cnt;
};
std::atomic_int Analyzer::cnt;

// thread pool test
void ThreadPoolTest(){
    std::shared_ptr<FixedThreadPool> pool = std::make_shared<FixedThreadPool>(5);
    pool->Start();

    for (int i = 0; i < 100; ++i){
        std::shared_ptr<Runnable> r = std::make_shared<Analyzer>();
        pool->Submit(r);
    }

    // pool->Stop();
}

// main
int main(){
    ThreadPoolTest();
    return 0;
}

2. 条件变量方式

#include <iostream>
#include <string>

#include <memory>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <atomic>

// interface
class Runnable{
public:
    virtual void Run() = 0;
};

// thread pool
class FixedThreadPool{
public:
    FixedThreadPool(int size) :threadPoolSize(size){}

    ~FixedThreadPool(){
        if (running){
            Stop();
        }
    }

    void Start(){
        running = true;
        for (int i = 0; i < threadPoolSize; ++i){
            std::shared_ptr<std::thread> t = std::make_shared<std::thread>(&FixedThreadPool::Run,this);
            std::lock_guard<std::mutex> lk(txThread);
            threads.push_back(t);
        }
    }

    void Stop(){
        bool flag = true;
        while (flag){
            std::this_thread::sleep_for(std::chrono::seconds(1));
            std::lock_guard<std::mutex> lk(txTasks);
            flag = !tasks.empty();
        }

        running = false;
        cond.notify_all();

        for (auto thread : threads){
            thread->join();
        }
    }

    void Submit(std::shared_ptr<Runnable> r){
        std::lock_guard<std::mutex> lk(txTasks);
        tasks.push(r);
        cond.notify_one();
    }

private:
    void Run(){
        while (running){
            std::shared_ptr<Runnable> task;
            /*{
                std::lock_guard<std::mutex> lk(txTasks);
                if (tasks.empty()){
                continue;
                }
                task = tasks.front();
                tasks.pop();
                }*/
            {
                std::unique_lock<std::mutex>  lk(txTasks);
                while (tasks.empty() && running){
                    cond.wait(lk);
                }
                if (!running){
                    break;
                }
                task = tasks.front();
                tasks.pop();
            }

            task->Run();
        }
    }

private:
    int                                        threadPoolSize;
    std::vector<std::shared_ptr<std::thread>>  threads;
    std::mutex                                 txThread;
    std::queue<std::shared_ptr<Runnable>>      tasks;
    std::mutex                                 txTasks;
    std::condition_variable                    cond;
    std::atomic_bool                           running;
};


// concrete runnable
class Analyzer:public Runnable{
public:
    Analyzer() = default;
    void Run(){
        std::cout << "analyze is running, cnt is " << cnt++ << std::endl;
    }
private:
    static std::atomic_int cnt;
};
std::atomic_int Analyzer::cnt;

// thread pool test
void ThreadPoolTest(){
    std::shared_ptr<FixedThreadPool> pool = std::make_shared<FixedThreadPool>(10);
    pool->Start();

    for (int i = 0; i < 100; ++i){
        std::shared_ptr<Runnable> r = std::make_shared<Analyzer>();
        pool->Submit(r);
    }

    // pool->Stop();
}

// main
int main(){
    ThreadPoolTest();
    return 0;
}
原文地址:https://www.cnblogs.com/walkinginthesun/p/10081802.html