程序员很苦逼,除了会写程序,还得会写博客!
背景
基于任务的程序设计、命令式数据并行和任务并行都要求能够支持并发更新的数组、列表和集合。
在.NET Framework 4 以前,为了让共享的数组、列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作。
如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制来保证这是一个线程安全的集合。
System.Collenctions 和 System.Collenctions.Generic 名称空间中所提供的经典列表、集合和数组的线程都不是安全的,不能接受并发请求,因此需要对相应的操作方法执行串行化。
1、下面看代码,代码中并没有实现线程安全和串行化:
class Program
{
private static object o = new object();
private static List<Beauty> _beauty { get; set; }
/* coder: 王爷
代码中 创建三个并发线程 来操作 _beauty 集合
System.Collections.Generic.List 这个列表在多个线程访问下,不能保证是安全的线程,所以不能接受并发的请求,我们必须对 ADD 方法的执行进行串行化
*/
static void Main(string[] args)
{
_beauty = new List<Beauty>();
/* 创建任务 t1,执行数据集合添加操作 */
Task t1 = Task.Factory.StartNew(() =>
{
AddBeauty();
});
/* 创建任务 t2,执行数据集合添加操作 */
Task t2 = Task.Factory.StartNew(() =>
{
AddBeauty();
});
/* 创建任务 t3,执行数据集合添加操作 */
Task t3 = Task.Factory.StartNew(() =>
{
AddBeauty();
});
Task.WaitAll(t1, t2, t3);
Console.WriteLine("美女总量:" + _beauty.Count);
Console.ReadLine();
Console.ReadKey();
}
/* 执行集合数据添加操作 */
static void AddBeauty()
{
Parallel.For(0, 1000, (i) =>
{
Beauty beauty = new Beauty();
beauty.Name = "素素_M" + i;
_beauty.Add(beauty);
});
}
}
class Beauty
{
public string Name { get; set; }
}
代码中开启了三个并发操作,每个操作都向集合中添加1000条数据,在没有保障线程安全和串行化的运行下,实际得到的数据并没有3000条,结果如下:
为此我们需要采用 Lock 关键字,来确保每次只有一个线程来访问 _beauty.Add(beauty); 这个方法。在添加美女处修改:
static void AddBeauty()
{
Parallel.For(0, 1000, (i) =>
{
Beauty beauty = new Beauty();
beauty.Name = "陈九九_G" + i;
lock (o)
{
_beauty.Add(beauty);
}
});
}
结果:
但是:锁的引入,带来了一定的开销和性能的损耗,并降低了程序的扩展性,在并发编程中显然不适用。
注意:
1.lock不能锁定空值,但Null是不需要被释放的。
2.lock不能锁定string类型,虽然它也是引用类型的。因为字符串类型被CLR“暂留”。即整个程序中任何给定字符串都只有一个实例,具有相同内容的字符串都代表着同一个实例。因此,只要在应用程序进程中的任何位置处具有相同内容的字符串上放置了锁,就将锁定应用程序中与该字符串具有相同内容的字符串。因此,最好锁定不会被暂留的私有或受保护成员。
2、System.Collections.Concurrent
.NET Framework 4 提供了新的线程安全和扩展的并发集合,它们能够解决潜在的死锁问题和竞争条件问题,因此在很多复杂的情形下它们能够使得并行代码更容易编写,这些集合尽可能减少需要使用锁的次数,从而使得在大部分情形下能够优化为最佳性能,不会产生不必要的同步开销。
需要注意的是:
线程安全并不是没有代价的,比起 System.Collenctions 和 System.Collenctions.Generic 命名空间中的列表、集合和数组来说,并发集合会有更大的开销。因此,应该只在需要从多个任务中并发访问集合的时候才使用并发几个,在串行代码中使用并发集合是没有意义的,因为它们会增加无谓的开销。
为此,在.NET Framework中提供了 System.Collections.Concurrent 新的命名空间可以访问用于解决线程安全问题,通过这个命名空间能访问以下为并发做好了准备的集合。
1.BlockingCollection 与经典的阻塞队列数据结构类似,能够适用于多个任务添加和删除数据,提供阻塞和限界能力。
2.ConcurrentBag 提供对象的线程安全的无序集合
3.ConcurrentDictionary 提供可有多个线程同时访问的键值对的线程安全集合
4.ConcurrentQueue 提供线程安全的先进先出集合
5.ConcurrentStack 提供线程安全的后进先出集合
这些集合通过使用比较并交换和内存屏障等技术,避免使用典型的互斥重量级的锁,从而保证线程安全和性能。
ConcurrentQueue 是完全无锁的,能够支持并发的添加元素,先进先出。
class Program
{
private static object o = new object();
private static Queue<Beauty> _beauty2 { get; set; }
private static ConcurrentQueue<Beauty> _concurrentBeauty { get; set; }
/* coder: 王爷
代码中 创建三个并发线程 _beauty 和 _concurrentBeauty 集合,每次添加 30000 条数据 查看 一般队列Queue 和 多线程安全下的队列ConcurrentQueue 执行情况
*/
static void Main(string[] args)
{
_beauty2 = new Queue<Beauty>();
Stopwatch sw = new Stopwatch();
sw.Start();
/* 创建任务 t1,执行数据集合添加操作 */
Task t1 = Task.Factory.StartNew(() =>
{
AddBeauty();
});
/* 创建任务 t2,执行数据集合添加操作 */
Task t2 = Task.Factory.StartNew(() =>
{
AddBeauty();
});
/* 创建任务 t3,执行数据集合添加操作 */
Task t3 = Task.Factory.StartNew(() =>
{
AddBeauty();
});
Task.WaitAll(t1, t2, t3);
sw.Stop();
Console.WriteLine("List<Beauty> 美女数据量:" + _beauty2.Count);
Console.WriteLine("List<Beauty> 执行时间为:" + sw.ElapsedMilliseconds);
Thread.Sleep(1000);
_concurrentBeauty = new ConcurrentQueue<Beauty>();
Stopwatch sw1 = new Stopwatch();
sw1.Start();
/*创建任务 tk1,执行数据集合添加操作*/
Task tk1 = Task.Factory.StartNew(() =>
{
AddConcurrenBeauty();
});
/*创建任务 tk2,执行数据集合添加操作*/
Task tk2 = Task.Factory.StartNew(() =>
{
AddConcurrenBeauty();
});
/*创建任务 tk3,执行数据集合添加操作*/
Task tk3 = Task.Factory.StartNew(() =>
{
AddConcurrenBeauty();
});
Task.WaitAll(tk1, tk2, tk3);
sw1.Stop();
Console.WriteLine("ConcurrentQueue<Beauty> 美女数据量:" + _concurrentBeauty.Count);
Console.WriteLine("ConcurrentQueue<Beauty> 执行时间为:" + sw1.ElapsedMilliseconds);
Console.ReadLine();
Console.ReadKey();
}
/* 执行集合数据添加操作 */
static void AddBeauty()
{
Parallel.For(0, 30000, (i) =>
{
Beauty beauty = new Beauty();
beauty.Name = "素素_M" + i;
lock (o)
{
_beauty2.Enqueue(beauty);
}
});
}
/*执行集合数据添加操作*/
static void AddConcurrenBeauty()
{
Parallel.For(0, 30000, (i) =>
{
Beauty beauty= new Beauty();
beauty.Name = "陈九九_G" + i;
_concurrentBeauty.Enqueue(beauty);
});
}
}
class Beauty
{
public string Name { get; set; }
}
需要注意的是,代码中的输出时间并不能够完全正确的展示出并发代码下的ConcurrentQueue性能,采用ConcurrentQueue在一定程度上也带来了损耗。结果如下:
3、ConcurrentQueue 还有另外两种方法:TryDequeue 尝试移除并返回 和 TryPeek 尝试返回但不移除。
class Program
{
private static object o = new object();
private static Queue<Beauty> _beauty2 { get; set; }
private static ConcurrentQueue<Beauty> _concurrentBeauty { get; set; }
static void Main(string[] args)
{
_concurrentBeauty = new ConcurrentQueue<Beauty>();
/*执行添加操作*/
Console.WriteLine("执行添加操作!");
Parallel.Invoke(AddConcurrenBeauty, AddConcurrenBeauty);
Console.WriteLine("ConcurrentQueue<Beauty> 美女数据量:" + _concurrentBeauty.Count);
/* 执行TryPeek操作,尝试返回不移除! */
Console.WriteLine("执行TryPeek操作,尝试返回不移除!");
Parallel.Invoke(PeekConcurrenBeauty, PeekConcurrenBeauty);
Console.WriteLine("ConcurrentQueue<Beauty> 美女数据量:" + _concurrentBeauty.Count);
/* 执行TryDequeue操作,尝试返回并移除! */
Console.WriteLine("执行TryDequeue操作,尝试返回并移除!");
Parallel.Invoke(DequeueConcurrenBeauty, DequeueConcurrenBeauty);
Console.WriteLine("ConcurrentQueue<Beauty> 美女数据量:" + _concurrentBeauty.Count);
Console.ReadKey();
}
/*执行集合数据添加操作*/
static void AddConcurrenBeauty()
{
Parallel.For(0, 100, (i) =>
{
Beauty beauty = new Beauty();
beauty.Name = "陈九九_G" + i;
_concurrentBeauty.Enqueue(beauty);
});
}
/* 尝试返回 但不移除 */
static void PeekConcurrenBeauty()
{
Parallel.For(0, 2, (i) =>
{
Beauty beauty = null;
bool excute = _concurrentBeauty.TryPeek(out beauty);
Console.WriteLine(beauty.Name);
});
}
/* 尝试返回 并移除 */
static void DequeueConcurrenBeauty()
{
Parallel.For(0, 2, (i) =>
{
Beauty beauty = null;
bool excute = _concurrentBeauty.TryDequeue(out beauty);
Console.WriteLine(beauty.Name);
});
}
}
class Beauty
{
public string Name { get; set; }
}
需要注意 TryDequeue 和 TryPeek 的无序性,在多线程下
4、ConcurrentStack 是完全无锁的,能够支持并发的添加元素,后进先出。
class Program
{
private static object o = new object();
private static Stack<Beauty> _beauty3 { get; set; }
private static ConcurrentStack<Beauty> _concurrenBeauty2 { get; set; }
/* coder: 王爷
代码中 创建三个并发线程 来操作_beauty 和 _concurrenBeauty 集合,每次添加 30000 条数据 查看 一般Stack 和 多线程安全下的 ConcurrentStack 执行情况
*/
static void Main(string[] args)
{
Thread.Sleep(1000);
_beauty3 = new Stack<Beauty>();
Stopwatch sw = new Stopwatch();
sw.Start();
/* 创建任务 t1,执行数据集合添加操作 */
Task t1 = Task.Factory.StartNew(() =>
{
AddBeauty2();
});
/* 创建任务 t2,执行数据集合添加操作 */
Task t2 = Task.Factory.StartNew(() =>
{
AddBeauty2();
});
/* 创建任务 t3,执行数据集合添加操作 */
Task t3 = Task.Factory.StartNew(() =>
{
AddBeauty2();
});
Task.WaitAll(t1, t2, t3);
sw.Stop();
Console.WriteLine("List<Beauty> 美女数据量:" + _beauty3.Count);
Console.WriteLine("LIst<Beauty> 执行时间为:" + sw.ElapsedMilliseconds);
Thread.Sleep(1000);
_concurrenBeauty2 = new ConcurrentStack<Beauty>();
Stopwatch sw2 = new Stopwatch();
sw2.Start();
/*创建任务 tk1,执行数据集合添加操作*/
Task tk1 = Task.Factory.StartNew(() =>
{
AddConcurrenBeauty2();
});
/*创建任务 tk2,执行数据集合添加操作*/
Task tk2 = Task.Factory.StartNew(() =>
{
AddConcurrenBeauty2();
});
/*创建任务 tk3,执行数据集合添加操作*/
Task tk3 = Task.Factory.StartNew(() =>
{
AddConcurrenBeauty2();
});
Task.WaitAll(tk1, tk2, tk3);
sw2.Stop();
Console.WriteLine("ConcurrentStack<Beauty> 美女数据量:" + _concurrenBeauty2.Count);
Console.WriteLine("ConcurrentStack<Beauty> 执行时间为:" + sw2.ElapsedMilliseconds);
Console.ReadKey();
}
/* 执行集合数据添加操作 */
static void AddBeauty2()
{
Parallel.For(0, 30000, (i) =>
{
Beauty beauty = new Beauty();
beauty.Name = "素素_M" + i;
lock (o)
{
_beauty3.Push(beauty);
}
});
}
/*执行集合数据添加操作*/
static void AddConcurrenBeauty2()
{
Parallel.For(0, 30000, (i) =>
{
Beauty beauty = new Beauty();
beauty.Name = "陈九九_G" + i;
_concurrenBeauty2.Push(beauty);
});
}
}
class Beauty
{
public string Name { get; set; }
}
5、ConcurrentStack 还有另外两种方法:TryPop 尝试移除并返回 和 TryPeek 尝试返回但不移除。
class Program
{
private static object o = new object();
private static Stack<Beauty> _beauty3 { get; set; }
private static ConcurrentStack<Beauty> _concurrenBeauty2 { get; set; }
static void Main(string[] args)
{
_concurrenBeauty2 = new ConcurrentStack<Beauty>();
/*执行添加操作*/
Console.WriteLine("执行添加操作");
Parallel.Invoke(AddConcurrenBeauty2, AddConcurrenBeauty2);
Console.WriteLine("ConcurrentStack<Beauty> 当前数据量为:" + _concurrenBeauty2.Count);
/*执行TryPeek操作 尝试返回不移除*/
Console.WriteLine("执行TryPeek操作 尝试返回不移除");
Parallel.Invoke(PeekConcurrenBeauty2, PeekConcurrenBeauty2);
Console.WriteLine("ConcurrentStack<Beauty> 当前数据量为:" + _concurrenBeauty2.Count);
/*执行TryDequeue操作 尝试返回并移除*/
Console.WriteLine("执行TryPop操作 尝试返回并移除");
Parallel.Invoke(PopConcurrenBeauty2, PopConcurrenBeauty2);
Console.WriteLine("ConcurrentStack<Beauty> 当前数据量为:" + _concurrenBeauty2.Count);
Console.ReadKey();
}
/*执行集合数据添加操作*/
static void AddConcurrenBeauty2()
{
Parallel.For(0, 100, (i) =>
{
Beauty beauty = new Beauty();
beauty.Name = "陈九九_G" + i;
_concurrenBeauty2.Push(beauty);
});
}
/*尝试返回 但不移除*/
static void PeekConcurrenBeauty2()
{
Parallel.For(0, 2, (i) =>
{
Beauty beauty = null;
bool excute = _concurrenBeauty2.TryPeek(out beauty);
Console.WriteLine(beauty.Name);
});
}
/*尝试返回 并 移除*/
static void PopConcurrenBeauty2()
{
Parallel.For(0, 2, (i) =>
{
Beauty beauty = null;
bool excute = _concurrenBeauty2.TryPop(out beauty);
Console.WriteLine(beauty.Name);
});
}
}
class Beauty
{
public string Name { get; set; }
}