【造轮子】MFC实现BlockingQueue

最近任务需要在MFC下做多线程生产者消费者模式的东西,我找了半天貌似MFC没有类似Java里面BlockingQueue那样的工具(也许是我手残没找到)。

网上好像也有很多大佬去实现这个。但是我没仔细去找,看了看一些资料就想着造个轮子玩玩。

实现如下:

主要是利用CCriticalSection保护内置的std::list,然后用CEvent来实现生产者消费者的同步。

参考资料:http://stackoverflow.com/questions/6683356/c-templated-producer-consumer-blockingqueue-unbounded-buffer-how-do-i-end-el

接口文件:IBlockingQueue.h

 1 #pragma once
 2 
 3 template <class T>
 4 class IBlockingQueue
 5 {
 6 public:
 7     virtual ~IBlockingQueue() {} // 为了让实现这个接口的类的析构函数能被正确调用,参考:http://blog.csdn.net/chaoguodong/article/details/6935524
 8     virtual int size() = 0;
 9     virtual T pop_front() = 0;
10     virtual T pop_back() = 0;
11     virtual void push_front(T val) = 0;
12     virtual void push_back(T val) = 0;
13     virtual bool empty() = 0;
14     virtual void stop() = 0;
15     virtual bool is_stop() = 0;
16 };

实现文件:BlockingQueue.h

  1 #pragma once
  2 
  3 #include <afxmt.h>
  4 #include <list>
  5 
  6 #include "Bridge.h"
  7 
  8 #include "StoppingException.h"
  9 
 10 // 参考资料:http://stackoverflow.com/questions/6683356/c-templated-producer-consumer-blockingqueue-unbounded-buffer-how-do-i-end-el
 11 
 12 template <class T>
 13 class CBlockingQueue : public IBlockingQueue<T>
 14 {
 15     CCriticalSection m_cs; // 保护 m_lst
 16     CEvent m_emptyEvent; // m_lst 为空 就 reset,m_lst 不为空 就 set 放行
 17     std::list<T> m_lst;
 18     bool m_bStop;
 19 public:
 20     CBlockingQueue();
 21     CBlockingQueue(const CBlockingQueue<T>& obj);
 22     CBlockingQueue<T>& operator=(const CBlockingQueue<T>& obj);
 23     virtual int size();
 24     virtual T pop_front();
 25     virtual T pop_back();
 26     virtual void push_front(T val);
 27     virtual void push_back(T val);
 28     virtual bool empty();
 29     virtual void stop();
 30     virtual bool is_stop();
 31 };
 32 
 33 template <class T>
 34 CBlockingQueue<T>::CBlockingQueue()
 35     : m_emptyEvent(FALSE, TRUE, NULL, NULL), m_bStop(false) // 初始为RESET,不自动RESET
 36 {}
 37 
 38 template <class T>
 39 CBlockingQueue<T>::CBlockingQueue(const CBlockingQueue<T>& obj)
 40     : m_emptyEvent(FALSE, TRUE, NULL, NULL), m_bStop(false) // 初始为RESET,不自动RESET
 41 {
 42     m_cs.Lock();
 43     obj.m_cs.Lock();
 44     m_lst = obj.m_lst;
 45     obj.m_cs.Unlock();
 46     m_cs.Unlock();
 47 }
 48 
 49 template <class T>
 50 CBlockingQueue<T>& CBlockingQueue<T>::operator = (const CBlockingQueue<T>& obj)
 51 {
 52     m_cs.Lock();
 53     obj.m_cs.Lock();
 54     m_lst = obj.m_lst;
 55     obj.m_cs.Unlock();
 56     m_cs.Unlock();
 57     return *this;
 58 }
 59 
 60 template <class T>
 61 int CBlockingQueue<T>::size()
 62 {
 63     m_cs.Lock();
 64     int sz = 0;
 65     sz = m_lst.size();
 66     m_cs.Unlock();
 67     return sz;
 68 }
 69 template <class T>
 70 T CBlockingQueue<T>::pop_front()
 71 {
 72     T val;
 73     bool done = false; // val 是否从 m_lst 中取出
 74     while (!done)
 75     {
 76         m_cs.Lock();
 77         if (!m_bStop) // 如果另一个线程在m_lst非空的时候调用了stop,然后那个线程结束了。那么调用pop_front的线程就无限等待了。
 78         {
 79             m_cs.Unlock();
 80             ::WaitForSingleObject(m_emptyEvent.m_hObject, INFINITE);
 81         }
 82         else
 83             m_cs.Unlock();
 84         m_cs.Lock();
 85         if (m_lst.empty())
 86         {
 87             if (m_bStop) // 先检测 empty 再检测 stop,因为 stop 为 true 的时候 m_lst 中仍然可能有数据没取出。要知道 stop 是为了提示其他线程没有更多的数据了,而不是为了强行中断其他线程获取数据。
 88             {
 89                 m_cs.Unlock();
 90                 throw StoppingException();
 91             }
 92         }
 93         else
 94         {
 95             val = m_lst.front();
 96             m_lst.pop_front();
 97             if (m_lst.empty())
 98                 m_emptyEvent.ResetEvent();
 99             done = true;
100         }
101         m_cs.Unlock();
102     }
103     return val;
104 }
105 template <class T>
106 T CBlockingQueue<T>::pop_back()
107 {
108     T val;
109     bool done = false; // val 是否从 m_lst 中取出
110     while (!done)
111     {
112         m_cs.Lock();
113         if (!m_bStop)
114         {
115             m_cs.Unlock();
116             ::WaitForSingleObject(m_emptyEvent.m_hObject, INFINITE);
117         }
118         else
119             m_cs.Unlock();
120         m_cs.Lock();
121         if (m_lst.empty())
122         {
123             if (m_bStop)
124             {
125                 m_cs.Unlock();
126                 throw StoppingException();
127             }
128         }
129         else
130         {
131             val = m_lst.back();
132             m_lst.pop_back();
133             if (m_lst.empty())
134                 m_emptyEvent.ResetEvent();
135             done = true;
136         }
137         m_cs.Unlock();
138 
139     }
140     return val;
141 }
142 template <class T>
143 void CBlockingQueue<T>::push_front(T val)
144 {
145     m_cs.Lock();
146     m_lst.push_front(val);
147     m_emptyEvent.SetEvent();
148     m_cs.Unlock();
149 }
150 template <class T>
151 void CBlockingQueue<T>::push_back(T val)
152 {
153     m_cs.Lock();
154     m_lst.push_back(val);
155     m_emptyEvent.SetEvent();
156     m_cs.Unlock();
157 }
158 template <class T>
159 bool CBlockingQueue<T>::empty()
160 {
161     m_cs.Lock();
162     bool bEmpty = m_lst.empty();
163     m_cs.Unlock();
164     return bEmpty;
165 }
166 template <class T>
167 bool CBlockingQueue<T>::is_stop()
168 {
169     m_cs.Lock();
170     bool bStop = m_bStop;
171     m_cs.Unlock();
172     return bStop;
173 }
174 template <class T>
175 void CBlockingQueue<T>::stop()
176 {
177     m_cs.Lock();
178     m_bStop = true;
179     m_emptyEvent.SetEvent();
180     m_cs.Unlock();
181 }

实现文件:BlockingQueue.cpp

#include "BlockingQueue.h"

StoppingException::StoppingException() {}

StoppingException::~StoppingException() {}

测试文件:MyApp.h

#pragma once

#include <afxwin.h>

class CMyApp :
    public CWinApp
{
public:
    virtual BOOL InitInstance();
};

测试文件:MyApp.cpp

#include "MyApp.h"

#include "BlockingQueue.h"

using namespace std;

class CMainWindow :
    public CFrameWnd
{
public:
    CMainWindow();
    DECLARE_MESSAGE_MAP()
    afx_msg void OnClose();
};

CMainWindow::CMainWindow()
{
    Create(NULL, _T("The Hello Application"), WS_OVERLAPPED | WS_CAPTION |
        WS_SYSMENU | WS_MINIMIZEBOX | WS_THICKFRAME,
        CRect(32, 64, 352, 304));
}

CMyApp myApp;

// 共享的数据'
IBlockingQueue<int>* pBQ = new CBlockingQueue<int>();

#define NUM_PRODUCER 9 // 生产者个数
#define NUM_CONSUMER 5 // 消费者个数

CWinThread* pThreadProducer[NUM_PRODUCER]; // 生产者线程
CWinThread* pThreadConsumer[NUM_CONSUMER]; // 消费者线程
HANDLE hConsumer[NUM_CONSUMER]; // 消费者HANDLE

// 生产
UINT Produce(LPVOID pParam)
{
    for (int i = 0; i < 10; ++i)
    {
        TRACE(_T("Producer[%d]Producing: %d
"), ::GetCurrentThreadId(), i);
        pBQ->push_back(i);
        TRACE(_T("Producer[%d]Producing: %d
"), ::GetCurrentThreadId(), i);
        pBQ->push_front(i);
        TRACE(_T("Producer[%d]Sleeping...
"), ::GetCurrentThreadId());
        ::Sleep(1000);
    }
    TRACE(_T("Producer[%d]Exiting...
"), ::GetCurrentThreadId());
    return 0;
}
// 消费
UINT Consume(LPVOID pParam)
{
    try {
        while (true)
        {
            TRACE(_T("Consumer[%d]Waiting...
"), ::GetCurrentThreadId());
            int val = pBQ->pop_front();
            TRACE(_T("Consumer[%d]Consuming: %d
"), ::GetCurrentThreadId(), val);
            val = pBQ->pop_back();
            TRACE(_T("Consumer[%d]Consuming: %d
"), ::GetCurrentThreadId(), val);
        }
    }
    catch (StoppingException& e)
    {
        TRACE(_T("Consumer[%d]%s...
"), ::GetCurrentThreadId(), e.msg);
    }
    TRACE(_T("Consumer[%d]Exiting...
"), ::GetCurrentThreadId());
    return 0;
}
// 主线程(UI)
BOOL CMyApp::InitInstance()
{

    _CrtSetBreakAlloc(210);

    m_pMainWnd = new CMainWindow;
    m_pMainWnd->ShowWindow(m_nCmdShow);
    m_pMainWnd->UpdateWindow();

    // 共享的初始数据
    for (int i = 0; i < 100; ++i)
    {
        pBQ->push_back(i);
    }
    // 创建消费者线程
    for (int i = 0; i < NUM_CONSUMER; ++i)
    {
        CWinThread* pThread = ::AfxBeginThread(Consume, NULL, THREAD_PRIORITY_NORMAL, 0, CREATE_SUSPENDED);
        pThread->m_bAutoDelete = FALSE;
        pThreadConsumer[i] = pThread;
        hConsumer[i] = pThread->m_hThread;
    }
    // 启动消费者线程
    for (int i = 0; i < NUM_CONSUMER; ++i)
    {
        pThreadConsumer[i]->ResumeThread();
    }
    // 创建生产者线程
    for (int i = 0; i < NUM_PRODUCER; ++i)
    {
        CWinThread* pThread = ::AfxBeginThread(Produce, NULL, THREAD_PRIORITY_NORMAL, 0, CREATE_SUSPENDED);
        pThread->m_bAutoDelete = FALSE;
        pThreadProducer[i] = pThread;
    }
    // 启动生产者线程
    for (int i = 0; i < NUM_PRODUCER; ++i)
    {
        pThreadProducer[i]->ResumeThread();
    }

    return TRUE;
}

BEGIN_MESSAGE_MAP(CMainWindow, CFrameWnd)
    ON_WM_CLOSE()
END_MESSAGE_MAP()

// 退出主线程
void CMainWindow::OnClose()
{
    pBQ->stop();

    ::WaitForMultipleObjects(NUM_CONSUMER, hConsumer, TRUE, INFINITE);

    for (int i = 0; i < NUM_CONSUMER; ++i)
    {
        delete pThreadConsumer[i];
    }

    for (int i = 0; i < NUM_PRODUCER; ++i)
    {
        delete pThreadProducer[i];
    }

    delete pBQ;

    CFrameWnd::OnClose();
}
原文地址:https://www.cnblogs.com/qrlozte/p/6675006.html