异步任务调度二

异步任务调度二

适用于DELPHI7及以上版本,支持跨操作系统。

/// <author>cxg 2020-7-14</author>
(*使用:
unit Unit1;

interface

uses    tasks, MsgPack,
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, StdCtrls;

type
  TForm1 = class(TForm)
    Button1: TButton;
    procedure Button1Click(Sender: TObject);
  private
    { Private declarations }
    tasks: TThreadCfg;
  public
    { Public declarations }
    procedure callback(task: TMsgPack);
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

procedure TForm1.Button1Click(Sender: TObject);
var task: TMsgPack;
  queue: TTaskQueue;
begin
  task := TMsgPack.Create;
  task.Force('f1').AsString := '测试';
  queue := TTaskQueue.Create;
  queue.enQueue(task);
  tasks := TThreadCfg.Create(1, queue);
  tasks.onCallback := callback;
  tasks.newThreads;
end;

procedure TForm1.callback(task: TMsgPack);
begin
  Caption := task.force('f1').AsString;
  tasks.Free;
end;

end.
*)

unit tasks;

interface

uses
  {$IFDEF mswindows}
  Windows,
  {$ENDIF}
  MsgPack, Contnrs,
  SyncObjs, Classes,
  SysUtils;

type
  TCallBack = procedure(task: TMsgPack) of object;

type
  TTaskQueue = class   //任务队列(线程安全)
  private
    fQueue: TQueue;
    fCS: TCriticalSection;
  public
    constructor Create;
    destructor Destroy; override;
    procedure enQueue(task: Pointer);
    function deQueue: Pointer;
  end;

type
  TThreadCfg = class     //管理 工作线程
  private
    fQueue: TTaskQueue;
    fCallBack: TCallBack;
    fThreadNum: Integer;
    fWorkers: array of TThread;
  public
    constructor Create(const threadNum: Integer; const queue: TTaskQueue);
    destructor Destroy; override;
    procedure newThreads;
    property onCallback: TCallBack read fCallBack write fCallBack;
  end;

type
  TWorkThread = class(TThread)  //工作线程
  private
    fConfig: TThreadCfg;
  public
    constructor Create(cfg: TThreadCfg);
    destructor Destroy; override;
    procedure Execute; override;
  end;  

implementation

function GetCPUNum: Integer;
{$IFDEF MSWINDOWS}
var
  si: SYSTEM_INFO;
{$ENDIF}
begin
  {$IFDEF MSWINDOWS}
  GetSystemInfo(si);
  Result := si.dwNumberOfProcessors;
  {$ELSE}// Linux,MacOS,iOS,Andriod{POSIX}
  {$IFDEF POSIX}
  Result := sysconf(_SC_NPROCESSORS_ONLN);
  {$ELSE}// unkown system, default 1
  Result := 5;
  {$ENDIF POSIX}
  {$ENDIF MSWINDOWS}
end;

{ TTaskQueue }

constructor TTaskQueue.Create;
begin
  fQueue := TQueue.Create;
  fCS := TCriticalSection.Create;
end;

function TTaskQueue.deQueue: Pointer;
begin
  fCS.Enter;
  Result := fQueue.Pop;
  fCS.Leave;
end;

destructor TTaskQueue.Destroy;
begin
  FreeAndNil(fQueue);
  FreeAndNil(fCS);
  inherited;
end;

procedure TTaskQueue.enQueue(task: Pointer);
begin
  fCS.Enter;
  fQueue.Push(task);
  fCS.Leave;
end;

{ TThreadCfg }

constructor TThreadCfg.Create(const threadNum: Integer;
  const queue: TTaskQueue);
begin
  fThreadNum := threadNum;
  fQueue := queue;
  if fThreadNum = 0 then
    fThreadNum := GetCPUNum;
  SetLength(fWorkers, fThreadNum);
end;

destructor TThreadCfg.Destroy;
var i: Integer;
begin
  for i := 0 to fThreadNum - 1 do  //停止并释放工作线程
  begin
    fWorkers[i].Terminate;
    fWorkers[i].WaitFor;
    fWorkers[i].Free;
  end;
  fQueue.Free;       //释放队列
  inherited;
end;

procedure TThreadCfg.newThreads;
var i: Integer;
begin
  for i := 0 to fThreadNum - 1 do
  begin
    fWorkers[i] := TWorkThread.Create(Self);
    fWorkers[i].Resume;
  end;
end;

{ TWorkThread }

constructor TWorkThread.Create(cfg: TThreadCfg);
begin
  inherited Create(True);
  FreeOnTerminate := true;
  fConfig := cfg;
end;

destructor TWorkThread.Destroy;
begin

  inherited;
end;

procedure TWorkThread.Execute;
var pack: TMsgPack;
begin
  while not Self.Terminated do
  begin
    if fConfig.fQueue.fQueue.Count > 0 then
    begin
      pack := TMsgPack(fConfig.fQueue.deQueue);
      if Assigned(fConfig.fCallBack) then
      begin
        fConfig.fCallBack(pack);
        pack.Free;    //释放
      end;
    end;
    Sleep(1);
    {$IFDEF mswindows}
    SwitchToThread;
    {$ELSE}
    TThread.Yield;
    {$ENDIF}
  end;
end;

end.

  

  

原文地址:https://www.cnblogs.com/hnxxcxg/p/13298034.html