半同步半异步线程池

1、半同步半异步线程池

  在处理大量并发任务的时候,如果一个请求一个线程来处理任务,大量的线程创建将会消耗过多的资源,还增加了线程上下文切换的开销,而通过线程池技术可以很好的解决这个问题。

  线程池技术通过在系统中预先创建一些线程,当任务请求来到的时候,从线程池中分配一个预先创建的线程来处理任务,在任务完成之后,线程不会销毁,还可以重用,等待下次任务的到来。这样就避免了大量线程的创建和销毁,从而节省系统资源,对于多核处理器,线程可以分配到多个cpu,提高并发效率。每个线程处理任务,独立阻塞,防止主线程被阻塞,导致其他请求得不到响应。

  线程池分为半同步半异步线程池和领导者追随者线程池,半同步半异步线程池在实现上更加的简单,使用方便,也是最常用的线程池。

  

  第一层为同步服务层,处理来自上层的任务请求,上层的任务请求是并发的,并不会马上处理,而是将这些任务放到一个同步队列中,等待处理。

  第二层为同步队列,来自上层的任务都会加到队列中等待处理。

  第三层是异步服务层,这一层中有多个线程同事处理排队任务中的任务,异步服务层从同步队列中取出任务并行的处理任务。

  这种三层的结构可以最大程度的处理上层的并发请求,对于上层来说,只需要往队列中添加任务,不关心谁处理任务,主线程也不会阻塞,对于任务的具体处理,通过异步服务层的多线程异步并行来完成。

2、线程池

  中间层是一个同步队列,允许多个线程同时添加或者取出任务,并且要保证操作过程是线程安全的。线程池有两个活动,一个是往同步队列中添加任务,另一个是从同步队列中取出任务。

  

  启动线程池会启动一定数量的线程,这些线程属于异步层,主要用来并行处理队列中任务,如果队列中的任务为空,则等待任务的到来,如果有任务,则从等待的线程中唤醒一定数量的线程来执行任务。

  需要注意的是,队列中一定要限制任务的数量,避免任务过多导致内存暴涨。

3、同步队列

  同步队列保存任务,为同步层提供添加新任务的接口,为异步层提供取任务的接口。同步队列中的锁用来线程同步,条件变量用来实现线程通信。

#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>

template<typename T>
class SynQueue
{
public:
    SynQueue(int maxsize) : m_maxSize(maxsize), m_needStop(false) { }

    //添加事件,左值拷贝和右值移动
    void Put(const T&x)
    {
        //调用private内部接口Add
        Add(x);
    }

    void Put(T &&x)
    {
        Add(x);
    }

    //从队列中取事件,取所有事件
    void Take(std::list<T> &list)
    {
        //有wait方法必须用unique_lock
        //unique_lock有定时等待等功能,lock_guard就仅仅是RAII手法的互斥锁
        //但unique_lock的性能稍低于lock_guard
        std::unique_lock<std::mutex> locker(m_mutex);
        //满足条件则唤醒,不满足阻塞
        m_notEmpty.wait(locker, [this] { return m_needStop || NotEmpty(); });

        if (m_needStop)
        {
            return;
        }

        std::cout << "取出所有任务..." << std::endl;
        list = std::move(m_queue);
        //唤醒其他阻塞在互斥锁的线程
        m_notFull.notify_one();
    }

    //取一个事件
    void Take(T &t)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notEmpty.wait(locker, [this]    { return m_needStop || NotEmpty(); });
        if (m_needStop)
        {
            return;
        }
        
        std::cout << "取出一个任务..." << std::endl;
        t = m_queue.front();
        m_queue.pop_front();
        m_notFull.notify_one();
    }

    //停止所有线程在同步队列中的读取
    void Stop()
    {
        {
            std::lock_guard<std::mutex> locker(m_mutex);
            m_needStop = true;
        }

        std::cout << "停止任务执行..." << std::endl;
        m_notFull.notify_all();
        m_notEmpty.notify_all();
    }

    //队列为空
    bool Empty()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.empty();
    }

    //队列为满
    bool Full()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.size() == m_maxSize;
    }

    //队列大小
    size_t Size()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.size();
    }

private:

    //往队列里添加事件,事件是范型的,c++11我们可以把函数通过std::function封装为对象。
    template<typename F>
    void Add(F &&x)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notFull.wait(locker, [this] { return m_needStop || NotFull(); });
        if (m_needStop)
        {
            return;
        }
        
        std::cout << "添加一个任务..." << std::endl;
        m_queue.push_back(std::forward<F>(x));
        m_notEmpty.notify_one();
    }

    //队列未满
    bool NotFull() const
    {
        bool full = m_queue.size() >= m_maxSize;
        if (full)
        {
            std::cout << "任务池满,等待取出任务..." << "线程ID:" << std::this_thread::get_id() << std::endl;
        }

        return !full;
    }

    //队列不为空
    bool NotEmpty() const
    {
        bool empty = m_queue.empty();
        if (empty)
        {
            std::cout << "任务池为空,等待添加任务..." << "线程ID:" << std::this_thread::get_id() << std::endl;
        }

        return !empty;
    }

private:
    std::mutex m_mutex;                                  //互斥锁
    std::list<T> m_queue;                                //队列,存放任务
    std::condition_variable m_notEmpty;            //队列不为空的条件变量
    std::condition_variable m_notFull;              //队列不为满的条件变量
    size_t m_maxSize;                                   //任务队列最大长度 
    bool m_needStop;                                    //终止标识
};

4、线程池

  一个完整的线程池包括三种:同步服务层、排队层和异步服务层,其实也是一种消费者生产者模式,同步层是生产者,不断将新任务丢到队列中,因此线程池需要提供一个添加任务的接口;消费者是异步层,具体是从线程池中预先创建的线程去处理队列中的任务。线程池还提供一个停止的接口,在需要的时候,停止线程池的运行。

#include "SyncQueue.hpp"
#include <functional>
#include <thread>
#include <atomic>
#include <memory>

const int MaxTaskCount = 10;

class ThreadPool
{
public:

    //规定任务类型为void(),我们可以通过c++11 不定参数模板来实现一个可接受任何函数的范型函数模板,这样就是一个可以接受任何任务的任务队列了。
    using Task = std::function<void()>;

    //hardware_concurrency检测硬件性能,给出默认线程数
    ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
    {
        //初始化线程,并通过shared_ptr来管理
        Start(numThreads);
    }

    //销毁线程池
    ~ThreadPool(void)
    {
        Stop();
    }

    //终止所有线程,call_once保证函数只调用一次
    void Stop()
    {
        std::call_once(m_flag, [this] { StopThreadGroup(); });
    }

    //添加任务,普通版本和右值引用版本
    void AddTask(const Task& task)
    {
        m_queue.Put(task);
    }

    void AddTask(Task && task)
    {
        m_queue.Put(std::forward<Task>(task));
    }

private:

    //停止线程池
    void StopThreadGroup()
    {
        m_queue.Stop();
        m_running = false;
        for (auto thread : m_threadgroup)
        {
            if (thread)
                thread->join();
        }

        m_threadgroup.clear();
    }

    void Start(int numThreads)
    {
        m_running = true;
        std::cout << "初始化线程池..." << std::endl;
        for (int i = 0; i < numThreads; ++i)
        {
            //智能指针管理,并给出构建线程的参数,线程调用函数和参数
            m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
        }
    }

    //一次取出队列中全部事件
    void RunInThread_list()
    {
        std::cout << "取出所有任务并执行..." << std::endl;
        while (m_running)
        {
            std::list<Task> list;
            m_queue.Take(list);
            for (auto &task : list)
            {
                if (!m_running)
                {
                    return;
                }

                task();
            }
        }
    }

    //一次只取一个事件
    void RunInThread()
    {
        while (m_running)
        {
            Task task;
            if (!m_running)
                return;

            m_queue.Take(task);

            //执行任务
            task();
        }
    }

private:
    //线程池
    std::list<std::shared_ptr<std::thread>> m_threadgroup;
    //任务队列
    SynQueue<Task>m_queue;
    //原子布尔值
    std::atomic_bool m_running;
    //辅助变量->call_once
    std::once_flag m_flag;
};

  线程池中,有三个成员变量,一个是线程组,线程组中的线程是预先创建的,具体创建多少个,由外部决定,一般是CPU核数的线程达到最优的效率,线程组循环从同步队列中取出任务并执行,如果任务为空,线程组等待。另一个成员变量是同步队列,不仅用来做线程同步,还用来限制同步队列的上限。第三个成员变量是用来停止线程池的,为了保证线程安全,用了原子变量atomic_bool。

5、线程池的使用

  通过简单的例子来使用半同步半异步线程池。线程池创建两个线程,通过外部线程不断地想线程池添加新任务,线程池内部的线程将会并行的处理同步队列中的任务。

#include <thread>    
#include <iostream> 

#include "ThreadPool.hpp"

using namespace std;

int main(int argc, char *argv[])
{
    ThreadPool pool;

    //pool.Start(2);
    std::thread thd1([&pool]
    {
        for (int i = 0; i < 10; i++)
        {
            auto thdId = this_thread::get_id();
            cout << "线程 1 添加任务..." << endl;
            pool.AddTask([thdId] {cout << "线程1任务,线程ID:" << thdId << "	执行线程:" << this_thread::get_id() << endl; });
        }
    });

    std::thread thd2([&pool]
    {
        for (int i = 0; i < 10; i++)
        {
            auto thdId = this_thread::get_id();
            cout << "线程 2 添加任务..." << endl;
            pool.AddTask([thdId]    { cout << "线程2任务,线程ID:" << thdId << "	执行线程:" << this_thread::get_id() << endl;  });
        }
    });

    thd1.join();
    thd2.join();

    //等待线程池执行30s
    cout << "主线程休眠等待..." << endl;
    this_thread::sleep_for(std::chrono::seconds(30));
    pool.Stop();

    return EXIT_SUCCESS;
}
原文地址:https://www.cnblogs.com/ChinaHook/p/7967001.html