利用C++实现了一个简易的线程池模型(基于Win32 SDK),方便使用多线程处理任务。共包含Thread.h、Thread.cpp、ThreadPool.h、ThreadPool.cpp四个源文件。功能相对简单,尚待完善。
Thread.h
1 // Thread.h: interface for the CThread class. 2 // 3 ////////////////////////////////////////////////////////////////////// 4 5 #if !defined(AFX_THREAD_H__9BC45342_F764_4104_B614_1CC8A1964171__INCLUDED_) 6 #define AFX_THREAD_H__9BC45342_F764_4104_B614_1CC8A1964171__INCLUDED_ 7 8 #if _MSC_VER > 1000 9 #pragma once 10 #endif // _MSC_VER > 1000 11 12 class CThread 13 { 14 public: 15 CThread(); 16 virtual ~CThread(); 17 18 public: 19 BOOL IsBusy(); 20 void Terminate(); 21 DWORD GetThreadId() const; 22 void Run(LPTHREAD_START_ROUTINE lpStartAddress, LPVOID lpParameter); 23 24 protected: 25 void _SetBusy(BOOL bBusy); 26 27 private: 28 void __Proc(); 29 static DWORD WINAPI __ThreadProc(LPVOID lpParameter); 30 31 protected: 32 LPTHREAD_START_ROUTINE m_pThreadProc; 33 LPVOID m_pParameter; 34 35 BOOL m_bBusy; 36 CRITICAL_SECTION m_csBusy; 37 38 HANDLE m_hEventWait; 39 HANDLE m_hEventExit; 40 41 HANDLE m_hThread; 42 DWORD m_dwThreadId; 43 }; 44 45 #endif // !defined(AFX_THREAD_H__9BC45342_F764_4104_B614_1CC8A1964171__INCLUDED_)
Thread.cpp
1 // Thread.cpp: implementation of the CThread class. 2 // 3 ////////////////////////////////////////////////////////////////////// 4 5 #include "StdAfx.h" 6 #include "Thread.h" 7 8 9 CThread::CThread() 10 : m_pThreadProc(0) 11 , m_pParameter(0) 12 , m_bBusy(0) 13 , m_hThread(0) 14 { 15 m_hEventWait = CreateEvent(0, TRUE, FALSE, NULL); 16 m_hEventExit = CreateEvent(0, TRUE, FALSE, NULL); 17 InitializeCriticalSection(&m_csBusy); 18 m_hThread = CreateThread(0, 0, __ThreadProc, this, 0, &m_dwThreadId); 19 } 20 21 CThread::~CThread() 22 { 23 Terminate(); 24 WaitForSingleObject(m_hThread, INFINITE); 25 CloseHandle(m_hThread); 26 DeleteCriticalSection(&m_csBusy); 27 CloseHandle(m_hEventWait); 28 CloseHandle(m_hEventExit); 29 } 30 31 void CThread::Terminate() 32 { 33 SetEvent(m_hEventExit); 34 } 35 36 BOOL CThread::IsBusy() 37 { 38 EnterCriticalSection(&m_csBusy); 39 BOOL bRet = m_bBusy; 40 LeaveCriticalSection(&m_csBusy); 41 return bRet; 42 } 43 44 void CThread::_SetBusy(BOOL bBusy) 45 { 46 EnterCriticalSection(&m_csBusy); 47 m_bBusy = bBusy; 48 LeaveCriticalSection(&m_csBusy); 49 } 50 51 DWORD CThread::GetThreadId() const 52 { 53 return m_dwThreadId; 54 } 55 56 void CThread::Run(LPTHREAD_START_ROUTINE lpStartAddress, LPVOID lpParameter) 57 { 58 if (!IsBusy() && lpStartAddress) 59 { 60 _SetBusy(TRUE); 61 m_pThreadProc = lpStartAddress; 62 m_pParameter = lpParameter; 63 SetEvent(m_hEventWait); 64 } 65 } 66 67 DWORD WINAPI CThread::__ThreadProc(LPVOID lpParameter) 68 { 69 CThread* pThis = (CThread*)lpParameter; 70 pThis->__Proc(); 71 72 return 0; 73 } 74 75 void CThread::__Proc() 76 { 77 for (;;) 78 { 79 if (!m_pThreadProc) 80 { 81 HANDLE hEvents[2] = {m_hEventExit, m_hEventWait}; 82 DWORD dwRet = WaitForMultipleObjects(2, hEvents, FALSE, INFINITE); 83 if (dwRet - WAIT_OBJECT_0 == 0) break; 84 } 85 m_pThreadProc(m_pParameter); 86 m_pThreadProc = NULL; 87 m_pParameter = NULL; 88 ResetEvent(m_hEventWait); 89 _SetBusy(FALSE); 90 } 91 }
ThreadPool.h
1 // ThreadPool.h: interface for the CThreadPool class. 2 // 3 ////////////////////////////////////////////////////////////////////// 4 5 #if !defined(AFX_THREADPOOL_H__E24F5439_D623_4511_A874_40CC9417DDEE__INCLUDED_) 6 #define AFX_THREADPOOL_H__E24F5439_D623_4511_A874_40CC9417DDEE__INCLUDED_ 7 8 #if _MSC_VER > 1000 9 #pragma once 10 #endif // _MSC_VER > 1000 11 12 #pragma warning(push) 13 #pragma warning(disable: 4786) 14 15 #include <queue> 16 #include <vector> 17 #include <utility> 18 19 #pragma warning(pop) 20 21 class CThread; 22 class CThreadPool 23 { 24 public: 25 CThreadPool(size_t nMaxCount); 26 virtual ~CThreadPool(); 27 28 public: 29 BOOL IsBusy(); 30 BOOL Wait(DWORD dwMilliseconds); 31 void Run(LPTHREAD_START_ROUTINE lpStartAddress, LPVOID lpParameter); 32 33 private: 34 void __Proc(); 35 static DWORD WINAPI __ThreadProc(LPVOID lpParameter); 36 37 protected: 38 HANDLE m_hThread; 39 HANDLE m_hEventExit; 40 41 CRITICAL_SECTION m_csFunc; 42 43 std::vector<CThread*> m_vThreads; 44 std::queue< std::pair<LPTHREAD_START_ROUTINE, LPVOID> > m_qFuncs; 45 }; 46 47 #endif // !defined(AFX_THREADPOOL_H__E24F5439_D623_4511_A874_40CC9417DDEE__INCLUDED_)
ThreadPool.cpp
1 // ThreadPool.cpp: implementation of the CThreadPool class. 2 // 3 ////////////////////////////////////////////////////////////////////// 4 5 #include "StdAfx.h" 6 #include "Thread.h" 7 #include <algorithm> 8 #include "ThreadPool.h" 9 10 11 CThreadPool::CThreadPool(size_t nMaxCount) 12 { 13 for (size_t i(0); i < nMaxCount; ++i) 14 { 15 m_vThreads.push_back(new CThread); 16 } 17 InitializeCriticalSection(&m_csFunc); 18 m_hEventExit = CreateEvent(NULL, TRUE, FALSE, NULL); 19 m_hThread = CreateThread(0, 0, __ThreadProc, this, 0, 0); 20 } 21 22 CThreadPool::~CThreadPool() 23 { 24 SetEvent(m_hEventExit); 25 WaitForSingleObject(m_hThread, INFINITE); 26 CloseHandle(m_hThread); 27 for (std::vector<CThread*>::iterator i(m_vThreads.begin()) 28 ; i != m_vThreads.end(); ++i) 29 { 30 delete *i; 31 } 32 m_vThreads.clear(); 33 DeleteCriticalSection(&m_csFunc); 34 } 35 36 DWORD WINAPI CThreadPool::__ThreadProc(LPVOID lpParameter) 37 { 38 CThreadPool* pThis = (CThreadPool*)lpParameter; 39 pThis->__Proc(); 40 41 return 0; 42 } 43 44 void CThreadPool::__Proc() 45 { 46 for (;WaitForSingleObject(m_hEventExit, 100) != WAIT_OBJECT_0;) 47 { 48 EnterCriticalSection(&m_csFunc); 49 if (!m_qFuncs.empty()) 50 { 51 for (std::vector<CThread*>::iterator i(m_vThreads.begin()) 52 ; i != m_vThreads.end(); ++i) 53 { 54 if (!(*i)->IsBusy() && !m_qFuncs.empty()) 55 { 56 std::pair<LPTHREAD_START_ROUTINE, LPVOID>& pFunc = m_qFuncs.front(); 57 (*i)->Run(pFunc.first, pFunc.second); 58 m_qFuncs.pop(); 59 } 60 } 61 } 62 LeaveCriticalSection(&m_csFunc); 63 } 64 } 65 66 void CThreadPool::Run(LPTHREAD_START_ROUTINE lpStartAddress, LPVOID lpParameter) 67 { 68 EnterCriticalSection(&m_csFunc); 69 std::pair<LPTHREAD_START_ROUTINE, LPVOID> pFunc(std::make_pair(lpStartAddress, lpParameter)); 70 m_qFuncs.push(pFunc); 71 LeaveCriticalSection(&m_csFunc); 72 } 73 74 BOOL CThreadPool::IsBusy() 75 { 76 BOOL bRet(FALSE); 77 EnterCriticalSection(&m_csFunc); 78 if (m_qFuncs.empty()) 79 { 80 for (std::vector<CThread*>::iterator i(m_vThreads.begin()) 81 ; i != m_vThreads.end(); ++i) 82 { 83 if ((*i)->IsBusy()) 84 { 85 bRet = TRUE; 86 break; 87 } 88 } 89 } 90 else bRet = TRUE; 91 LeaveCriticalSection(&m_csFunc); 92 return bRet; 93 } 94 95 BOOL CThreadPool::Wait(DWORD dwMilliseconds) 96 { 97 DWORD dwTick = GetTickCount(); 98 for (;IsBusy();) 99 { 100 Sleep(1); 101 if (GetTickCount() - dwTick > dwMilliseconds) 102 { 103 break; 104 } 105 } 106 return !IsBusy(); 107 }
使用示例:
1 // Test.cpp : Defines the entry point for the console application. 2 // 3 4 #include "StdAfx.h" 5 #include "ThreadPool.h" 6 7 CRITICAL_SECTION cs; 8 9 using namespace std; 10 11 DWORD WINAPI ThreadProc(LPVOID lpParameter) 12 { 13 EnterCriticalSection(&cs); 14 cout << LPCSTR(lpParameter) << endl; 15 LeaveCriticalSection(&cs); 16 return 0; 17 } 18 19 int _tmain(int argc, TCHAR* argv[], TCHAR* envp[]) 20 { 21 InitializeCriticalSection(&cs); 22 23 CThreadPool *pThrealPool = new CThreadPool(2); 24 pThrealPool->Run(ThreadProc, "001"); 25 pThrealPool->Run(ThreadProc, "002"); 26 pThrealPool->Run(ThreadProc, "003"); 27 pThrealPool->Run(ThreadProc, "004"); 28 pThrealPool->Run(ThreadProc, "005"); 29 pThrealPool->Run(ThreadProc, "006"); 30 pThrealPool->Run(ThreadProc, "007"); 31 32 pThrealPool->Wait(INFINIE); 33 delete pThrealPool; 34 35 DeleteCriticalSection(&cs); 36 37 return nRetCode; 38 }
运行结果:
源代码下载:http://t.cn/RvnJe9S