线程池处理异步任务队列

线程池处理异步任务队列

/// <author>cxg 2020-9-3</author>
/// 线程池处理异步任务队列
/// 支持d7以上版本,更低版本没有测试,支持跨OS

unit tasks;

interface

uses
  {$IFDEF mswindows}
  Windows,
  {$ENDIF}
  {$IFDEF posix}
  posix.Unistd, posix.Semaphore,
  {$ENDIF}
  Contnrs, SyncObjs, Classes, SysUtils;

type
  TCallBack = procedure(task: Pointer) of object;

type
  TThreadConf = class
  private
    fCallBack: TCallBack;
    fThreadNum: Integer;
    fWorkers: array of TThread;
    fCS: TCriticalSection;
    procedure freeThreads;
    procedure newThreads;
  public
    constructor Create(const threadNum: Integer = 0);
    destructor Destroy; override;
    procedure startThreads;
    procedure stopThreads;
    procedure allotTask(task: Pointer);
    property onCallback: TCallBack read fCallBack write fCallBack;
  end;

type
  TWorkThread = class(TThread)
  private
    fConf: TThreadConf;
    fQueue: TQueue;
    {$IFDEF mswindows}
    hsem: THandle; // 信号量
    {$ELSE}
    hsem: sem_t;
    {$ENDIF}
  public
    constructor Create(conf: TThreadConf);
    destructor Destroy; override;
    procedure Execute; override;
    procedure enqueue(task: Pointer);
  end;

function cpuNum: Integer;

implementation

var
  gIndex: Integer = 0;

function cpuNum: Integer;
{$IFDEF MSWINDOWS}
var
  si: SYSTEM_INFO;
{$ENDIF}
begin
  {$IFDEF MSWINDOWS}
  GetSystemInfo(si);
  Result := si.dwNumberOfProcessors;
  {$ELSE}
  Result := sysconf(_SC_NPROCESSORS_ONLN);
  {$ENDIF}
end;

{ TThreadConf }

procedure TThreadConf.allotTask(task: Pointer);
var
  i: Integer;
begin
  fCS.Enter;
  try
    i := gIndex mod fThreadNum;
    TWorkThread(fWorkers[i]).enqueue(task);
    inc(gIndex);
  finally
    fCS.Leave;
  end;
end;

constructor TThreadConf.Create(const threadNum: Integer = 0);
begin
  fCS := TCriticalSection.Create;
  fThreadNum := threadNum;
  if fThreadNum = 0 then
    fThreadNum := cpuNum;
  SetLength(fWorkers, fThreadNum);
  newThreads;
end;

destructor TThreadConf.Destroy;
begin
  freeThreads;
  FreeAndNil(fCS);
  inherited;
end;

procedure TThreadConf.freeThreads;
var
  i: Integer;
begin
  for i := 0 to fThreadNum - 1 do
  begin
    fWorkers[i].Terminate;
    fWorkers[i].WaitFor;
    FreeAndNil(fWorkers[i]);
  end;
end;

procedure TThreadConf.newThreads;
var
  i: Integer;
begin
  for i := 0 to fThreadNum - 1 do
    fWorkers[i] := TWorkThread.Create(Self);
end;

procedure TThreadConf.startThreads;
var
  i: Integer;
begin
  for i := 0 to fThreadNum - 1 do
    {$IFDEF unicode}
    fWorkers[i].Start;
    {$ELSE}
    fWorkers[i].Resume;
    {$ENDIF}
end;

procedure TThreadConf.stopThreads;
var
  i: Integer;
begin
  for i := 0 to fThreadNum - 1 do
    fWorkers[i].Suspend;
end;

{ TWorkThread }

constructor TWorkThread.Create(conf: TThreadConf);
begin
  inherited Create(True);
  FreeOnTerminate := True;
  fConf := conf;
  fQueue := TQueue.Create;
  {$IFDEF mswindows}
  hsem := CreateSemaphore(nil, 0, 1, nil);
  {$ELSE}
  sem_init(hsem, 0, 0);
  {$ENDIF}
end;

destructor TWorkThread.Destroy;
begin
  FreeAndNil(fQueue);
  {$IFDEF mswindows}
  CloseHandle(hsem);
  {$ELSE}
  sem_destroy(hsem);
  {$ENDIF}
  inherited;
end;

procedure TWorkThread.enqueue(task: Pointer);
begin
  fQueue.Push(task);
  {$IFDEF mswindows}
  ReleaseSemaphore(hsem, 1, nil);
  {$ELSE}
  sem_post(hsem);
  {$ENDIF}
end;

procedure TWorkThread.Execute;
var
  task: Pointer;

  procedure run;
  begin
    task := fQueue.Pop;
    if task <> nil then
      if Assigned(fConf.fCallBack) then
        fConf.fCallBack(task);
  end;

begin
  while not Self.Terminated do
  begin
    {$IFDEF mswindows}
    if WaitForSingleObject(hsem, INFINITE) = WAIT_OBJECT_0 then
      run;
    {$ELSE}
    if sem_wait(hsem) > 0 then
      run;
    {$ENDIF}
  end;
end;

end.

  

原文地址:https://www.cnblogs.com/hnxxcxg/p/13605686.html