第11章 Windows线程池(2)_Win2008及以上的新线程池

11.2 Win2008以上的新线程池

(1)传统线程池的优缺点:

  ①传统Windows线程池调用简单,使用方便(有时只需调用一个API即可)

  ②这种简单也带来负面问题,如接口过于简单,无法更多去控制线程池的行为。

(2)Windows2008新线程池及API

线程池对象

传统API

Win2008及以上平台新API

普通任务线程池

QueueUserWorkItem

CreateThreadpoolWork

TrySubmitThreadpoolWaitCallbacks

SubmitThreadpoolWork

CloseThreadpoolWork

计时器线程池

CreateTimerQueue(创建线程池)

CreateThreadpoolTimer

CreateTimerQueueTimer(创建计时器)

SetThreadpoolTimer

ChangeTimerQueueTimer

IsThreadpoolTimerSet

DeleteTimerQueueTimer

WaitForThreadpoolTimerCallbacks

DeteTimerQueueEx

CloseThreadpoolTimer

同步对象等待线程池

RegisterWaitForSingleObject

CreateThreadpoolWait

UnregisterWaitEx

SetThreadpoolWait

WaitForThreadpoolWaitCallbacks

CloseThreadpoolWait

完成端口线程池

BindIoCompletionCallback

CreateThreadpoolIo

StartThreadpoolIo

CancelThreadpoolIo

WaitForThreadpoolIoCallbacks

CloseThreadpoolIo

(3)新线程池辅助API

功能

辅助API

线程池清理器

CreateThreadpoolCleanupGroup

CloseThreadpoolCleanupGroup

ClosethreadpoolCleanupGroupMembers

线程池控制函数

CreateThreadpool

CloseThreadpool

SetThreadpoolThreadMaximum

SetThreadpoolMinimum

线程池环境设备

InitializeThreadpoolEnviroment

DestroyThreadpoolEnvironment

SetThreadpoolCallbackCleanupGroup

SetThreadCallbackLibrary

SetThreadpoolbackpool

SetThreadpoolCallbackRunsLong

显式设定一个长时间调用的回调线程池函数

CallbackMayRunLong

清理及回调方法

DisassociateCurrentThreadFromCallback

FreeLibraryWhenCallbackReturns

LeaveCriticalSectionWhenCallbackReturns

ReleaseMutexWhenCallbackReturns

ReleaseSemaphoreWhenCallbackReturns

SetEventWhenCallbackReturns

(3)Win2008新线程池的一般编程模型

11.2.1 以异步方式调用函数

(1)单步使用线程池——TrySubmitThreadpoolCallback函数提交异步函数给线程池

参数

描述

PTP_SIMPLE_CALLBACK pfnCallback

回调函数,其原型为:

VOID NTAPI SimpleCallback

(PTP_CALLBACK_INSTANCE pInstance, //不透明的参数,调用回调函数时,由Windows自动传入,可用于继续传给其他回调终止操作的函数使用,如LeaveCriticalSectionWhenCallbackReturns。。

PVOID pvContext);//其中的pInstanc见“回调函数终止时行为”那部分的内容

PVOID pvContext

回调函数的额外参数。

PTP_CALLBACK_ENVIRON pcbe

回调环境,用来对线程池进行定制的参数。(注意,这个结构体内部与一个线程池关联,该参数为NULL时,会创建默认线程池,否则我们可以用CreateThreadpool来创建一个线程池,并与这个结构体关联起来)

备注:当为pcbe为NULL时,该函数被调用时系统会在进程中创建一个默认的线程池,并让线程池中的一个线程来调用指定的回调函数。该函数(内部调用PostQueuedCompletionStatus)将一个工作项添加到队列中。

(2)两步使用线程池

  ①CreateThreadpoolWork创建“工作项”。注意与之前所说的那些工作项不同。这里的工作项是个对象,不能简单理解成是一个回调函数。而是关联了回调函数及回调环境的一个对象了!

参数

描述

PTP_WORK_CALLBACK

pfnWorkHandler

工作项要关联的回调函数,其原型为

VOID CALLBACK WorkCallback(

PTP_CALLBACK_INSTNACE,PVOID pvContext,PTP_WORK work);

PVOID pvContext

回调函数的额外参数

PTP_CALLBACK_ENVIRON pcbe

回调环境,见《TrySubmitThreadpoolCallback函数》的说明

  ②SubmitThreadpoolWork提交这个工作项给线程池。结束后还可以关闭该工作项。

    SubmitThreadpoolWork(PTP_WORK pWork);

  【注意】

  ★WaitForThreadpoolWorkCallbacks(pWork,bCancelPendingCallbacks)取消己提交的工作项或等待工作项处理完成

    bCancelPendingCallbacks为FALSE,会等待工作项处理完成,函数再返回。为TRUE时会试图取消pWork这个工作项。

    如果用一个PTP_WORK提交了多个工作项,当bCancelPendingCallbacks为FALSE时则会等待所有的己提交的工作项,如果为TRUE只要等待当前正在运行的工作项完成时就返回。

  ★CloseThreadpoolWork关闭一个工作项

【Batch示例程序】批处理程序

 

//主程序

/*************************************************************************
Module:  Batch.cpp
Notices: Copyright(c) 2008 Jeffrey Ritcher & Christophe Nasarre
*************************************************************************/

#include "....CommonFilesCmnHdr.h"
#include "resource.h"

#include <tchar.h>
#include <strsafe.h>

//////////////////////////////////////////////////////////////////////////
//全局变量
HWND  g_hDlg = NULL;
PTP_WORK g_pWorkItem = NULL;//工作项对象
volatile LONG g_nCurrentTask = 0;

//自定义消息
#define WM_APP_COMPLETED  (WM_APP + 123)

//////////////////////////////////////////////////////////////////////////
void AddMessage(LPCTSTR szMsg){
    HWND hListBox = GetDlgItem(g_hDlg, IDC_LB_STATUS);
    ListBox_SetCurSel(hListBox, ListBox_AddString(hListBox, szMsg));
}

//////////////////////////////////////////////////////////////////////////
void WINAPI TaskHandler(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PTP_WORK pWork){

    LONG currentTask = InterlockedIncrement(&g_nCurrentTask);

    TCHAR szMsg[MAX_PATH];
    StringCchPrintf(szMsg, _countof(szMsg),
                    TEXT("线程[%u]:%u号任务开始. "), GetCurrentThreadId(), currentTask);
    AddMessage(szMsg);

    //模拟许多的工作
    Sleep(currentTask * 1000);

    StringCchPrintf(szMsg, _countof(szMsg),
                    TEXT("线程[%u]:%u号任务结束. "), GetCurrentThreadId(), currentTask);
    AddMessage(szMsg);

    if (InterlockedDecrement(&g_nCurrentTask)==0){
       //通知UI线程任务己经完成
        PostMessage(g_hDlg, WM_APP_COMPLETED, 0, (LPARAM)currentTask);
    }
}

//////////////////////////////////////////////////////////////////////////
void OnStartBatch(){
    //禁用“开始”按钮
    Button_Enable(GetDlgItem(g_hDlg, IDC_BTN_START_BATCH), FALSE);

    AddMessage(TEXT("----开始新的批处理----"));

    //使用同一个工作项对象,提交6个任务
    for (int i = 0; i < 6;i++){
        SubmitThreadpoolWork(g_pWorkItem);
    }
    //SubmitThreadpoolWork(g_pWorkItem);
    //SubmitThreadpoolWork(g_pWorkItem);
    //SubmitThreadpoolWork(g_pWorkItem);
    //SubmitThreadpoolWork(g_pWorkItem);
    //SubmitThreadpoolWork(g_pWorkItem);
}

//////////////////////////////////////////////////////////////////////////
BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam){
    g_hDlg = hwnd;
    return TRUE;
}


void Dlg_OnCommand(HWND hwnd, int id, HWND hwndCtrl, UINT codeNotity){
    switch (id)
    {
        case IDOK:
        case IDCANCEL:
            EndDialog(hwnd, id);
            break;

        case IDC_BTN_START_BATCH:
            OnStartBatch();
            break;
    }
}


//////////////////////////////////////////////////////////////////////////
INT_PTR WINAPI Dlg_Proc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam){
    switch (uMsg){
        chHANDLE_DLGMSG(hwnd, WM_INITDIALOG, Dlg_OnInitDialog);
        chHANDLE_DLGMSG(hwnd, WM_COMMAND, Dlg_OnCommand);

    case WM_APP_COMPLETED:
        {
            TCHAR szMsg[MAX_PATH + 1];
            StringCchPrintf(szMsg, _countof(szMsg),
                            TEXT("----%u号任务是批处理的最后一个任务----"), lParam);
            AddMessage(szMsg);

            //启用“开始”按钮
            Button_Enable(GetDlgItem(g_hDlg, IDC_BTN_START_BATCH), TRUE);
        }

        break;
    }
    return FALSE;
}

//////////////////////////////////////////////////////////////////////////
int APIENTRY _tWinMain(HINSTANCE hInst, HINSTANCE, LPTSTR pCmdLine, int){

    //创建用于供所有任务使用的工作项对象(最后一个参数为NULL,使用进程默认的线程池!)
    g_pWorkItem = CreateThreadpoolWork(TaskHandler, NULL, NULL);
    if (NULL == g_pWorkItem){
        MessageBox(NULL, TEXT("无法创建任务所需的工作项对象"),
                   TEXT(""), MB_ICONSTOP);
        return -1;
    }


    //ttoi,将字符转为整型
    DialogBoxParam(hInst, MAKEINTRESOURCE(IDD_MAIN), NULL, Dlg_Proc,_ttoi(pCmdLine));

    //关闭工作项对象
    CloseThreadpoolWork(g_pWorkItem);
    return 0;
}

//resource.h

//{{NO_DEPENDENCIES}}
// Microsoft Visual C++ 生成的包含文件。
// 供 11_Batch.rc 使用
//
#define IDD_MAIN                        101
#define IDC_LB_STATUS                   1001
#define IDC_BTN_START_BATCH             1002

// Next default values for new objects
// 
#ifdef APSTUDIO_INVOKED
#ifndef APSTUDIO_READONLY_SYMBOLS
#define _APS_NEXT_RESOURCE_VALUE        102
#define _APS_NEXT_COMMAND_VALUE         40001
#define _APS_NEXT_CONTROL_VALUE         1003
#define _APS_NEXT_SYMED_VALUE           101
#endif
#endif

//Batch.rc

// Microsoft Visual C++ generated resource script.
//
#include "resource.h"

#define APSTUDIO_READONLY_SYMBOLS
/////////////////////////////////////////////////////////////////////////////
//
// Generated from the TEXTINCLUDE 2 resource.
//
#include "winres.h"

/////////////////////////////////////////////////////////////////////////////
#undef APSTUDIO_READONLY_SYMBOLS

/////////////////////////////////////////////////////////////////////////////
// 中文(简体,中国) resources

#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_CHS)
LANGUAGE LANG_CHINESE, SUBLANG_CHINESE_SIMPLIFIED

#ifdef APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// TEXTINCLUDE
//

1 TEXTINCLUDE 
BEGIN
    "resource.h"
END

2 TEXTINCLUDE 
BEGIN
    "#include ""winres.h""
"
    ""
END

3 TEXTINCLUDE 
BEGIN
    "
"
    ""
END

#endif    // APSTUDIO_INVOKED


/////////////////////////////////////////////////////////////////////////////
//
// Dialog
//

IDD_MAIN DIALOGEX 0, 0, 191, 190
STYLE DS_SETFONT | DS_MODALFRAME | DS_CENTER | WS_POPUP | WS_CAPTION | WS_SYSMENU
CAPTION "利用线程池进行批处理"
FONT 9, "宋体", 400, 0, 0x86
BEGIN
    DEFPUSHBUTTON   "退出",IDOK,136,169,50,14
    LISTBOX         IDC_LB_STATUS,7,25,176,142,LBS_NOINTEGRALHEIGHT | NOT WS_BORDER | WS_VSCROLL | WS_HSCROLL | WS_TABSTOP,WS_EX_STATICEDGE
    PUSHBUTTON      "开始批处理",IDC_BTN_START_BATCH,7,7,50,14
END


/////////////////////////////////////////////////////////////////////////////
//
// DESIGNINFO
//

#ifdef APSTUDIO_INVOKED
GUIDELINES DESIGNINFO
BEGIN
    IDD_MAIN, DIALOG
    BEGIN
        LEFTMARGIN, 7
        RIGHTMARGIN, 186
        TOPMARGIN, 7
        BOTTOMMARGIN, 183
    END
END
#endif    // APSTUDIO_INVOKED

#endif    // 中文(简体,中国) resources
/////////////////////////////////////////////////////////////////////////////



#ifndef APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// Generated from the TEXTINCLUDE 3 resource.
//


/////////////////////////////////////////////////////////////////////////////
#endif    // not APSTUDIO_INVOKED

11.2.2 每隔一段时间调用一个函数

(1)CreateThreadpoolTimer函数——在线程池中创建一个定时器对象

参数

描述

PTP_TIMER_CALLBACK

     PfnTimerCallback

回调函数指针,其原型为

VOID CALLBACK TimeoutCallback(

PTP_CALLBACK_INSTANCE pInstance,

PVOID pvContext,PTP_TIMER pTimer);

PVOID pvContext

传给回调函数的额外参数

PTP_CALLBACK_ENVIRON pcbe

回调环境

返回值

计时器工作项,TP_TIMER对象的指针

(2)SetThreadpoolTimer:向线程池注册计时器

参数

描述

PTP_TIMER pTimer

要设定的计时器的指针

PFILETIME pftDueTime

第1次调用回调函数的时间。

①NULL:表示停止调用回调函数。(暂停但不销毁计时器)

②小于0:表示相对时间(单位微秒),即相对于调用SetThreadpoolTimer的时间。

③-1:表示立即开始。

④正数:以100ns为单位,从1600年1月1日 开始计算的周期数。

DWORD msPeriod

触发周期(单位微秒)。0表示只触发一次。注意如果回调函数执行的时间太长,而回调函数触发的周期又很短,此时系统会启动多个线程来执行这些回调函数。

DWORD msWindowLength

用来给回调函数的执行增加一些随机性。单位微秒

①设当前设定的触发时刻为T,则下次触发的时间是[T+msPeriod,T+msPeriod+msWindowLength]之间的任何一个时间值。

②msWindowLength的另一个作用是将计时器分组。如计时器A会在5-7微秒内被触发,计时器B会在6-8微秒内。因时间有重叠,所以线程池只会唤醒一个线程来处理这两个计时器,在处理完A的回调函数后,该线程不进入睡眠,会直接再调用B的回调函数,以减少用两个线程调用时产生的额外的线程上下文切换的开销。

(3)IsThreadpoolTimerSet——确定某个计时器是否己经被设置,即pftDueTime!=NULL

(4)WaitForThreadpoolTimerCallbacks等待一个计时器完成。

(5)CloseThreadpoolTimer释放计时器的内存。

【TimedMessageBox示例程序】

/*-----------------------------------------------------------------------------------------
Module: TimedMsgBox.cpp
Notices:Copyright(c) 2008 Jeffrey Richter & Christophe Nasarre
-----------------------------------------------------------------------------------------*/

#include "../../CommonFiles/CmnHdr.h"
#include <tchar.h>
#include <strsafe.h>

//////////////////////////////////////////////////////////////////////////
TCHAR g_szCaption[100];  //消息框的标题

int g_nSecLeft = 10; //消息框中显示的剩余时间

#define ID_MSGBOX_STATIC_TEXT   0x0000FFFF //对话框中文本框的ID,这是系统默认的ID
//////////////////////////////////////////////////////////////////////////
VOID CALLBACK MsgBoxTimeoutCallback(PTP_CALLBACK_INSTANCE pInstance,PVOID pvContext,PTP_TIMER pTimer){
    //注意,因竞争条件,当程序运行到这里时,可能对话框还没有创建!
    HWND hwnd = FindWindow(NULL, g_szCaption);

    if (hwnd !=NULL){
        if (g_nSecLeft ==1){
            //时间结束,强迫对话框退出
            EndDialog(hwnd, IDOK);
            return;
        }

        //如果对话框存在,则更新剩作时间
        TCHAR szMsg[100];
        StringCchPrintf(szMsg, _countof(szMsg), TEXT("还剩%d秒,按确定将取消"), --g_nSecLeft);
        SetDlgItemText(hwnd, ID_MSGBOX_STATIC_TEXT, szMsg);
    } 
}

//////////////////////////////////////////////////////////////////////////
int WINAPI _tWinMain(HINSTANCE hInstance,HINSTANCE hPrevInstance, PTSTR lpCmdLine,int nShowCmd){
    StringCchPrintf(g_szCaption, _countof(g_szCaption), TEXT("Timed Message Box"));
    
    //创建线程池计时器对象
    PTP_TIMER lpTimer = CreateThreadpoolTimer(MsgBoxTimeoutCallback, NULL, NULL);

    if (NULL ==lpTimer){
        TCHAR szMsg[MAX_PATH];
        StringCchPrintf(szMsg, _countof(szMsg), TEXT("无法创建计时器对象:%u"), GetLastError());
        MessageBox(NULL, szMsg, TEXT("错误"), MB_OK | MB_ICONERROR);
        return (-1);
    }

    //设置定时器1秒后触发,以后每秒触发一次
    ULARGE_INTEGER ulRelativeStartTime;
    ulRelativeStartTime.QuadPart = (LONGLONG)(-1000000); //单位微秒。转换后为1秒。
    FILETIME ftRelativeStartTime;
    ftRelativeStartTime.dwHighDateTime = ulRelativeStartTime.HighPart;
    ftRelativeStartTime.dwLowDateTime = ulRelativeStartTime.LowPart;
    SetThreadpoolTimer(
        lpTimer,
        &ftRelativeStartTime,
        1000, //每隔一秒触发
        0
        );

    //显示消息框
    MessageBox(NULL, TEXT("还剩10秒,按确定将取消"), g_szCaption, MB_OK);

    //清除计时器对象
    CloseThreadpoolTimer(lpTimer);

    //判断是用户取消计时或者是超时
    MessageBox(NULL, (g_nSecLeft == 1) ? TEXT("超时") : TEXT("用户取消"), TEXT("结果"), MB_OK);
    return 0;

}

 【NewWorkPool程序】简单的工作项和定时器回调演示

#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h>

//////////////////////////////////////////////////////////////////////////
//简单的工作项函数
VOID CALLBACK MyWorkCallback(PTP_CALLBACK_INSTANCE pInstance, 
                             PVOID pvContext,
                             PTP_WORK pWork){
    BOOL bRet = FALSE;
    DWORD dwPriorityOriginal = 0;

    dwPriorityOriginal = GetThreadPriority(GetCurrentThread());

    if (THREAD_PRIORITY_ERROR_RETURN == dwPriorityOriginal){
        _tprintf(_T("GetThreadPriority失败。错误码:%u
"), GetLastError());
        return;
    }


    bRet = SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);

    if (FALSE == bRet){
        _tprintf(_T("SetThreadPriority失败。错误码:%u
"), GetLastError());
        return;
    }

    _tprintf(_T("[ID:0x%X] MyWorkCallback Runing...
"), GetCurrentThreadId());
    
    bRet = SetThreadPriority(GetCurrentThread(), dwPriorityOriginal);

    if (FALSE == bRet){
        _tprintf(_T("SetThreadPriority失败。错误码:%u
"), GetLastError());
        return;
    }
    return;
}


//简单的定时回调函数
VOID CALLBACK MyTimerCallback(PTP_CALLBACK_INSTANCE pInstance,
                             PVOID pvContext,
                             PTP_TIMER pTimer){
    _tprintf(_T("[ID:0x%X] MyTimerCallback Runing...
"), GetCurrentThreadId());
}
//////////////////////////////////////////////////////////////////////////
int _tmain(){
    _tsetlocale(LC_ALL, _T("chs"));

    BOOL bRet = FALSE;
    PTP_WORK pWork = NULL;
    PTP_TIMER pTimer = NULL;
    PTP_POOL  pPool = NULL;
    PTP_WORK_CALLBACK  workcallback = MyWorkCallback;
    PTP_TIMER_CALLBACK timercallback = MyTimerCallback;
    TP_CALLBACK_ENVIRON CallbackEnviron;
    PTP_CLEANUP_GROUP cleanupgroup = NULL;
    FILETIME ftDueTime;
    ULARGE_INTEGER ulDueTime;

    UINT rollback = 0;

    __try{
        //初始化环境块
        InitializeThreadpoolEnvironment(&CallbackEnviron);

        //创建线程池
        pPool = CreateThreadpool(NULL);

        if (NULL == pPool){
            _tprintf(_T("创建线程池失败!错误码:%u
"), GetLastError());
            __leave;
        }

        rollback = 1;//创建线程池成功!

        //设置线程数
        SetThreadpoolThreadMaximum(pPool, 8);
        bRet = SetThreadpoolThreadMinimum(pPool, 2);

        if (!bRet){
            _tprintf(_T("SetThreadpoolThreadMinimum失败,错误码:%u
"), GetLastError());
            __leave;
        }

        //创建资源清理器
        cleanupgroup = CreateThreadpoolCleanupGroup();
        if (NULL == cleanupgroup){
            _tprintf(_T("CreateThreadpoolCleanupGroup失败!错误码:%u
"), GetLastError());
            __leave;    
        }

        rollback = 2; //资源清理器创建成功

        //将环境块与线程池关联
        SetThreadpoolCallbackPool(&CallbackEnviron, pPool);

        //将清理器与环境块联联
        SetThreadpoolCallbackCleanupGroup(&CallbackEnviron, cleanupgroup,NULL);

        //创建线程池需要的回调函数,这里是一个普通的工作项
        pWork = CreateThreadpoolWork(workcallback, NULL, &CallbackEnviron);

        if (NULL == pWork){
            _tprintf(_T("创建线程池普通工作项失败!错误码:%u
"), GetLastError());
            __leave;
        }

        rollback = 3; //创建普通工作项成功
        SubmitThreadpoolWork(pWork); //提交工作项

        //创建一个定时回调项
        pTimer = CreateThreadpoolTimer(timercallback, NULL, &CallbackEnviron);
        if (NULL == pTimer){
            _tprintf(_T("创建线程池计时器对象失败!错误码:%u
"), GetLastError());
            __leave;
        }

        rollback = 4; //计时器对象创建成功

        //设定定时回调周期
        ulDueTime.QuadPart = (LONGLONG)-(1 * 10 * 1000 * 1000); //1秒以后触发
        ftDueTime.dwHighDateTime = ulDueTime.HighPart;
        ftDueTime.dwLowDateTime = ulDueTime.LowPart;

        SetThreadpoolTimer(pTimer, &ftDueTime, 0, 0); //只调用一次

        //主线程进入等待状态或干别的工作
        Sleep(1500);

        //当所有的线程池回调函数都被执行后,关闭清理器
        CloseThreadpoolCleanupGroupMembers(cleanupgroup, FALSE, NULL);

        //事务标志,回滚到第2步,执行第2步后的销毁工作
        rollback = 2;
        __leave;

    }
    __finally{
        switch (rollback)
        {
        case 4:
        case 3:
            //关闭清理器
            CloseThreadpoolCleanupGroupMembers(cleanupgroup, FALSE, NULL);
            break;

        case 2:
            //关闭清理器
            CloseThreadpoolCleanupGroup(cleanupgroup);
            break;

        case 1:
            //关闭线程池
            CloseThreadpool(pPool);
            break;

        default:
                break;
        }
    }

    _tsystem(_T("PAUSE"));

    return 0;
}

 11.2.3 在内核对象触发时调用一个函数

(1)CreateThreadpoolWait——创建一个线程池等待对象(也是一个工作项,等待项)

参数

描述

PTP_WAIT_CALLBACK

     pfnWaitCallback

回调函数指针,其原型为

VOID CALLBACK WaitCallback(

PTP_CALLBACK_INSTANCE pInstance,

PVOID pvContext,PTP_WAIT pWait,

TP_WAIT_RESULT WaitResult);

PVOID pvContext

传给回调函数的额外参数

PTP_CALLBACK_ENVIRON pcbe

回调环境

返回值

等待对象的指针

备注:回调函数的WaitResult表示回调函数被调用的原因:

WAIT_OBJECT_0:表示传给SetThreadpoolWait的内核对象在超时之前被触发。

WAIT_TIMEOUT:表示内核对象在超时之前没被触发,回调函数被执行是因为超时

WAIT_ABANDONED_0:表示内核对象一个互斥量,并且互斥量被“遗弃”,触发了回调函数。

(2)SetThreadpoolWait——将某个内核对象绑定到线程池

参数

描述

PTP_WAIT pWaitItem

传入由CreateThreadWait返回的对象的指针

HANDLE hObject

要绑定的内核对象,当该对象被触发时,会调用线程池中的WaitCallback函数。

PFILETIME pftTimeout

线程池愿意花的最长多少时间来等待内核对象触发。

0:立即返回,负值为相对时间,正值为绝对时间,NULL表示无限等待。(线程池内部调用了WaitForMultipleObjects)

备注:①线程池内部让一个线程调用WaitForMultipleObjects并传入由SetThreadpoolWait函数注册的句柄,不断地组成一个句柄组,同时将Wait*函数的bWaitAll设为FALSE,这样当任何一个句柄被触发,线程池就会被唤醒。

②因WaitForMultipleObjects不允许将同一个句柄传入多次,因此必须确保不会用SetThreadpoolWait来多次注册同一个句柄,但可以调用DuplicationHandle复制句柄并传给Set*函数。

③因WaitForMultipleObjects一次最多只能等待64个内核对象,因此线程池实际上为每64个内核对象分配一个线程来等待,所以效率比较高。因此,如果要等待超过64个以上的内核对象,可以考虑用这种线程池,因为系统会每64个内核对象,就开辟一个线程来等待这些内核对象

④一旦线程池中一个线程调用了我们的回调函数,对应的等待项将进入“不活跃”状态。这意味着如果在同一个内核对象被触发时再次调用这个回调函数时,需要调用SetThreadpoolWait再次注册。如果传入的hObject为NULL,将把pWaitItem这个等待项从线程中移除。

(3)WaitForThreadpoolWaitCallbacks:等待一个等待项完成

(4)ClosethreadpoolWait函数:释放一个等待项的内存

 【NewWaitCallback程序】演示触发内核对象时,调用一个函数

#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h>

//////////////////////////////////////////////////////////////////////////
VOID CALLBACK MyWaitCallback(PTP_CALLBACK_INSTANCE pInstance,
                             PVOID pvContext,
                             PTP_WAIT pWait,
                             TP_WAIT_RESULT WaitResult){
    _tprintf(_T("线程[ID:0x%X] MyWaitCallback Runing...
"), GetCurrentThreadId());
}
//////////////////////////////////////////////////////////////////////////
int _tmain(){

    _tsetlocale(LC_ALL, _T("chs"));

    PTP_WAIT pWait = NULL;
    PTP_WAIT_CALLBACK pfnWaitCallback = MyWaitCallback;
    HANDLE hEvent = NULL;
    UINT rollback = 0;

    //创建一个事件对象
    hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); //自动重置
    if (NULL == hEvent)
        return 0;

    rollback = 1; //创建事件对象成功

    __try{
        //创建等待线程池
        pWait = CreateThreadpoolWait(pfnWaitCallback, NULL, NULL);//利用系统默认线程池
        if (NULL == pWait){
            _tprintf(_T("CreateThreadpoolWait失败。错误码:%u
"), GetLastError());
            __leave;
        }
    
        rollback = 2;

        //模拟等待5次,注意每次等待前要调用SetThreadpoolWait方法
        for (int i = 0; i < 5;i++){
            SetThreadpoolWait(pWait, hEvent, NULL); //这句很重要
            SetEvent(hEvent);
            Sleep(500);

            //主线程等待回调线程池调用完毕
            WaitForThreadpoolWaitCallbacks(pWait, TRUE);
        }
    }__finally{
        switch (rollback)
        {
        case 2:
            SetThreadpoolWait(pWait, NULL, NULL);//取消等待项
            CloseThreadpoolWait(pWait);
            break;
        case 1:
            CloseHandle(hEvent);
            break;
        default:
            break;
        }
    }

    _tsystem(_T("PAUSE"));
    return 0;
}

11.2.4 在异步I/O请求完成时调用一个函数

 

(1)CreateThreadpoolIo:创建线程池Io对象

参数

描述

HANDLE  hDevice

要关联的设备句柄

PTP_WIN32_IO_CALLBACK

                            pfnIoCallback

回调函数指针,其原型为

VOID CALLBACK OverlappedCompletionRoutine(

PTP_CALLBACK_INSTANCE pInstance,

PVOID pvContext,PVOID pOverlapped,

ULONG IoResult,//操作结果,成功时为NO_ERROR

ULONG_PTR NumberOfBytesTransferred,//已传输字节数

PTP_IO //指向线程池中的I/O项的指针,

//即CreateThreadpoolIo的返回值

);

PVOID pvContext

传给回调函数的额外参数

PTP_CALLBACK_ENVIRON pcbe

回调环境

返回值

I/O对象(一个工作项)的指针

(2)将IO对象(工作项)与线程池内部的I/O完成端口关联

VOID StartThreadpoolIo(PTP_IO pio);

 【注意】每次调用ReadFile和Writefile之前都必调用StartThreadpoolIo,否则回调函数不会被调用(这步相当于给完成端口增加IO完成项通知)

(3)停止线程池调用回调函数:VOID CancelThreadpoolIo(PTP_IO pio);

  【注意】①当发出IO请求之后,可以用这来取消。

      ②在调用ReadFile或WriteFile失败时,仍然必须调用CancelThreadpoolIo(除了返回FALSE且GetLastError为ERROR_IO_PENDING,因这表示正在完成)

(4)等待一处待处理的IO请求完成。

  WaitForThreadIoCallbacks(pio,bCancelPendingCallbacks);

  【注意】

    ①该函数须在另一个线程使用,而不能在回调函数内部使用,因为这会造成死锁。

    ②如果bCancelPendingCallbacks为TRUE,那么当请求完成的时候,回调函数不会被调用(如果尚未被调用)。这和调用CancelThreadpoolIo函数的时候很相似。

(5)解除IO对象(工作项)与线程池的关联:VOID CloseThreadpoolIo(PTP_IO pio);

【NewIOCPPool程序】模拟写入日志文件

  效果图与【IOCPPool程序】一致。

#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h>

//////////////////////////////////////////////////////////////////////////
#define  QMLX_ALLOC(sz)    HeapAlloc(GetProcessHeap(),0,sz)  //QMLX:浅墨浓香
#define  QMLX_CALLOC(sz)   HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz)
#define  QMLX_SAFEFREE(p)  if(NULL != p){HeapFree(GetProcessHeap(),0,p);p=NULL;}

#define  QMLX_ASSERT(s)    if(!(s)){DebugBreak();}
#define  QMLX_BEGINTHREAD(Fun,Param)  CreateThread(NULL,0,
                                     (LPTHREAD_START_ROUTINE)Fun,Param,0,NULL);

//////////////////////////////////////////////////////////////////////////
#define MAXWRITEPERTHREAD 20  //每个线程最大写入次数
#define MAXWRITETHREAD    10   //写入线程的数量

#define OP_READ    0x01  //读操作
#define OP_WRITE   0x02  //写操作

//单IO数据
typedef struct _tagPerIoData{
    OVERLAPPED  m_ol; 
    HANDLE      m_hFile;   //操作的文件句柄
    DWORD       m_dwOp;    //操作类型,OP_READ或OP_WRITE
    LPVOID      m_pData;   //操作的数据
    UINT        m_nLen;    //操作的数据长度
    DWORD       m_dwWrite; //写入的字节数 
    DWORD       m_dwTimestamp; //起始操作的时间戳
}PER_IO_DATA,*PPER_IO_DATA;

//IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance,PVOID pvContext,PVOID pOverlapped,
                           ULONG IoResult,ULONG_PTR NumberOfBytesTransferred,PTP_IO pio);

//写文件的线程
DWORD WINAPI WriteThread(LPVOID lpParam);

//当前操作的文件对象的指针
LARGE_INTEGER  g_liFilePointer = { 0 };

//IOCP线程池
PTP_IO  g_pThreadpoolIo = NULL;

//////////////////////////////////////////////////////////////////////////
//获取可模块的路径名(路径后含‘’)
VOID  GetAppPath(LPTSTR pszBuffer){
    DWORD dwLen = 0;
    if (0 == (dwLen = GetModuleFileName(NULL, pszBuffer, MAX_PATH)))
        return;

    for (DWORD i = dwLen; i > 0;i--){
        if ('\' == pszBuffer[i]){
            pszBuffer[i + 1] = '';
            break;
        }
    }
}

int _tmain(){
    _tsetlocale(LC_ALL, _T("chs"));
    TCHAR pFileName[MAX_PATH] = {};
    GetAppPath(pFileName);
    StringCchCat(pFileName, MAX_PATH, _T("NewIOCPFile.txt"));

    HANDLE ahWThread[MAXWRITETHREAD] = {};
    DWORD dwWrited = 0;

    //创建文件
    HANDLE hTxtFile = CreateFile(pFileName, GENERIC_WRITE, 0, NULL,
                                 CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
    if (INVALID_HANDLE_VALUE == hTxtFile){
        _tprintf(_T("CreateFile(%s)失败,错误码:%u
"), GetLastError());
        _tsystem(_T("PAUSE"));
        return 0;    
    }

    //初始化线程池回调环境
    TP_CALLBACK_ENVIRON poolEnv = {};
    InitializeThreadpoolEnvironment(&poolEnv);

    //创建IOCP线程池
    g_pThreadpoolIo = CreateThreadpoolIo(hTxtFile, (PTP_WIN32_IO_CALLBACK)IOCPCallback,hTxtFile,&poolEnv);
    
    //启动IOCP线程池
    StartThreadpoolIo(g_pThreadpoolIo);

    //写入UNICODE文件的前缀码,以便正确打开
    PER_IO_DATA* pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
    QMLX_ASSERT(pIo != NULL);

    pIo->m_dwOp = OP_WRITE;
    pIo->m_hFile = hTxtFile;
    pIo->m_pData = QMLX_CALLOC(sizeof(WORD));
    QMLX_ASSERT(pIo->m_pData != NULL);
    *((WORD*)pIo->m_pData) = MAKEWORD(0xFF, 0xFE);
    pIo->m_nLen = sizeof(WORD);

    //偏移文件指针
    pIo->m_ol.Offset = g_liFilePointer.LowPart;
    pIo->m_ol.OffsetHigh = g_liFilePointer.HighPart;
    g_liFilePointer.QuadPart += pIo->m_nLen;
    pIo->m_dwTimestamp = GetTickCount(); //记录时间戳

    WriteFile(hTxtFile, pIo->m_pData, pIo->m_nLen, 
              &pIo->m_dwWrite,(LPOVERLAPPED)&pIo->m_ol);

    //等待IOCP线程池完成操作
    WaitForThreadpoolIoCallbacks(g_pThreadpoolIo, FALSE);

    //启动写入线程进行日志写入操作
    for (int i = 0; i < MAXWRITETHREAD;i++){
        ahWThread[i] = QMLX_BEGINTHREAD(WriteThread, hTxtFile);
    }

    //让主线程等待这些写入线程结束
    WaitForMultipleObjects(MAXWRITETHREAD, ahWThread, TRUE, INFINITE);

    for (int i = 0; i < MAXWRITETHREAD;i++){
        CloseHandle(ahWThread[i]);
    }

    //关闭IOCP线程池
    CloseThreadpoolIo(g_pThreadpoolIo);

    //关闭日志文件
    if (INVALID_HANDLE_VALUE != hTxtFile){
        CloseHandle(hTxtFile);
        hTxtFile = INVALID_HANDLE_VALUE;
    }

    _tsystem(_T("PAUSE"));

    return 0;
}

//IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
                           ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio)
{
    if (NO_ERROR != IoResult){
        _tprintf(_T("I/O操作出错,错误码:%u
"),IoResult);
        return;
    }

    PPER_IO_DATA pIo = CONTAINING_RECORD((LPOVERLAPPED)pOverlapped, PER_IO_DATA, m_ol);
    DWORD dwCurTimestamp = GetTickCount();

    switch (pIo->m_dwOp)
    {
    case OP_WRITE://写操作结束
        {//写入操作结束
            _tprintf(_T("线程[0x%x]得到IO完成通知,完成操作(%s),缓冲(0x%08x)长度(%ubytes),写入时间戳(%u)当前时间戳(%u)时差(%u)
"),
                     GetCurrentThreadId(), OP_WRITE == pIo->m_dwOp ? _T("Write") : _T("Read"),
                     pIo->m_pData, pIo->m_nLen, pIo->m_dwTimestamp, dwCurTimestamp, dwCurTimestamp - pIo->m_dwTimestamp);

            QMLX_SAFEFREE(pIo->m_pData);
            QMLX_SAFEFREE(pIo);
        }
        break;

    case OP_READ: //读操作结束
        break;

    default:
        break;
    }
}

//写文件的线程
#define MAX_LOGLEN 256
DWORD WINAPI WriteThread(LPVOID lpParam)
{
    TCHAR pTxtContext[MAX_LOGLEN] = {};
    PPER_IO_DATA pIo = NULL;
    size_t szLen = 0;
    LPTSTR pWriteText = NULL;

    StringCchPrintf(pTxtContext, MAX_LOGLEN, _T("这是一条模拟的日志记录,由线程[0x%x]写入
"), 
                    GetCurrentThreadId());
    StringCchLength(pTxtContext, MAX_LOGLEN, &szLen);

    szLen += 1;
    
    int i = 0;
    for (; i < MAXWRITEPERTHREAD;i++){
        pWriteText = (LPTSTR)QMLX_CALLOC(szLen*sizeof(TCHAR));
        QMLX_ASSERT(NULL != pWriteText);
        StringCchCopy(pWriteText, szLen, pTxtContext);

        //为每个操作申请一个“单IO数据”结构体
        pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
        QMLX_ASSERT(pIo != NULL);

        pIo->m_dwOp = OP_WRITE;
        pIo->m_hFile = (HANDLE)lpParam;
        pIo->m_pData = pWriteText;
        pIo->m_nLen = (szLen-1)*sizeof(TCHAR);

        //这里使用原子操作同步文件指针,写入不会相互覆盖
        //这个地方体现了lock-free算法的精髓,使用了基本的CAS操作控制文件指针
        //比传统的使用关键代码段并等待的方法,这里用的方法要轻巧的多,付出的代价也小
        *((LONGLONG*)&pIo->m_ol.Pointer) = InterlockedCompareExchange64(&g_liFilePointer.QuadPart,
                                                                        g_liFilePointer.QuadPart + pIo->m_nLen, g_liFilePointer.QuadPart);
        pIo->m_dwTimestamp = GetTickCount(); //记录时间戳
        
        StartThreadpoolIo(g_pThreadpoolIo);

        //写入
        WriteFile((HANDLE)lpParam, pIo->m_pData, pIo->m_nLen,
                  &pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);
        if (ERROR_IO_PENDING != GetLastError()){
            CancelThreadpoolIo(g_pThreadpoolIo);
        }
    }

    return i;
}

11.2.5 回调函数的终止操作

(1)回调函数的pInstance参数:当线程调用回调函数时,Windows会自动传一个pInstance参数(类型PTP_CALLBACK_INSTANCE)给回调函数,然后回调函数将这个参数又传给如下的函数,以便在这些函数在回调函数完后,执行一些相应的终止操作(主要是用来通知另一个线程,线程池中的工作项己经完成。显然,如下函数是在回调函数的内部进行调用的!

函数

终止操作

LeaveCriticalSectionWhenCallbackReturns

当回调函数返回时,线程池会自动调用LeavCriticalSection,并在参数中传入指定的CRITCAL_SECTION结构体。

ReleaseMutexWhenCallbackReturns

当回调函数返回时,线程池会自动调用ReleaseMutex,并在参数中传入指定的HANDLE。

ReleaseSemaphoreWhenCallbackReturns

当回调函数返回的时候,线程池会自动调用ReleaseSemaphore,并在参数中传入指定的HANDLE

SetEventWhenCallbackReturns

当回调函数返回的时候,线程池会自动调用SetEvent,并在参数中传入指定的HANDLE。

FreeLibraryWhenCallbackReturns

当回调函数返回的时候,线程池会自动调用FreeLibrary,并在参数中传入指定的HMOUDLE。

(注意:如果回调函数是从DLL中载入的,这个函数尤为重要,因为当线程执行完毕后,回调函数不能自己调用FreeLibrary,否则回调函数代码将从进程中清除,这样当FreeLibrary试图返回到回调函数时,会引发访问违规)

注意,对于任何一个回调函数,只能执行上述的一种终止操作。如果调用了以上的多个函数,则最后调用的终止函数会覆盖之前调用的那个。

(2)BOOL WINAPI CallbackMayRunLong(PTP_CALLBACK_INSTANCE pci);用来通知线程池回调函数可能运行的时间会比较长。返回TRUE时,说明线程池还有其他线程可供使用。FALSE则相反。线程池会根据来决定是否创建新线程,以防止其他工作项出现挨饿现象。(只能在调用线程的回调函数里使用!

(3)DisassociateCurrentThreadFromCallback(PTP_CALLBACK_INSTANCE pci)

用来告诉线程池,逻辑上自己已经完成了工作。这使得任何由于调用WaitForThreadpool*Callbacks(如WaitForThreadpoolIoCallbacks)而被阻塞的线程能早一些返回,而不必等到线程从回调函数中结束时才返回。

原文地址:https://www.cnblogs.com/5iedu/p/4812259.html