任务队列处理

客户类型:

class Custom
{
public int Id { get; set; }
}

“ 随机等待时间”任务:

static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
return Task.Delay(delay);
}

“生成”任务:

static async Task TaskProducer(ConcurrentQueue<Custom> queue)
{
for (int i = 1; i <= 20; i++)
{
await Task.Delay(50);
var workItem = new Custom {Id = i};
queue.Enqueue(workItem);//入列
Console.WriteLine("Task {0} has been posted", workItem.Id);
}
}

“处理”任务:内部使用循环,只要没有获得取消通知,就始终执行。也就是说, 如果没有取消命令,那么他将始终保持准备状态,如果任务都处理完了,则out workItem中返回null,但始终在循环试图TryDequeue。

static async Task TaskProcessor(
ConcurrentQueue<Custom> queue, string name, CancellationToken token)
{
Custom workItem;
bool dequeueSuccesful = false;

await GetRandomDelay(); //随机等待一定的时间,模拟处理操作。
do
{
dequeueSuccesful = queue.TryDequeue(out workItem); //注意:这里是按照先进先出次序出队!
if (dequeueSuccesful)
{
Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name);
}

await GetRandomDelay();
}
while (!token.IsCancellationRequested);
}


主调用函数:

static async Task RunProgram()
{
var taskQueue = new ConcurrentQueue<Custom>();
var cts = new CancellationTokenSource();

//生成任务队列taskQueue
var taskSource = Task.Run(() => TaskProducer(taskQueue));

//使用四个处理任务来处理。
Task[] taskProcessors = new Task[4]; 
for (int i = 1; i <= 4; i++)
{
string processorId = i.ToString();
taskProcessors[i-1] = Task.Run(	() => TaskProcessor(taskQueue, "Processor " + processorId, cts.Token));
}
await taskSource; //等待任务产生完毕
cts.CancelAfter(5000); //两秒钟后取消处理程序

await Task.WhenAll(taskProcessors);//等待四个任务处理程序全部处理完毕
}

Main:

static void Main(string[] args)
{
Task t = RunProgram();
t.Wait();
Console.Read();
}




需要指出的是, 这里的任务都是按照先后进队次序而出队, 进而被处理的。虽然被处理完成的时间并不一定一样(线程的关系以及随机时间等待的原因),但是 被处理次序不变且不会产生矛盾。

原文地址:https://www.cnblogs.com/liangyuwen/p/14857548.html