List集合是非线程安全的,所以我们采用并行编程时会发生错误。如下图所示
Parallel.For(0, 1000, (i) =>
{
Product product = new Product();
product.Name = "name" + i;
product.Category = "Category" + i;
product.SellPrice = i;
_Products.Add(product);
});
代码中开启并发操作向集合中添加1000条数据,在没有保障线程安全和串行化的运行下,实际得到的数据不到1000条。为此我们需要采用Lock关键字,来确保每次只有一个线程来访问 ,可以得到1000条数据。
private static object o = new object();
Parallel.For(0, 1000, (i) =>
{
Product product = new Product();
product.Name = "name" + i;
product.Category = "Category" + i;
product.SellPrice = i;
lock (o)
{
_Products.Add(product);
}
});
但是锁的引入,带来了一定的开销和性能的损耗,并降低了程序的扩展性,在并发编程中显然不适用。为此,在.NET Framework 4中提供了System.Collections.Concurrent新的命名空间可以访问用于解决线程安全问题,它们能够解决潜在的死锁问题和竞争条件问题,因此在很多复杂的情形下它们能够使得并行代码更容易编写,这些集合尽 可能减少需要使用锁的次数,从而使得在大部分情形下能够优化为最佳性能,不会产生不必要的同步开销。
线程安全并不是没有代价的,比起System.Collenctions和System.Collenctions.Generic命名空间中的 列表、集合和数组来说,并发集合会有更大的开销。因此,应该只在需要从多个任务中并发访问集合的时候才使用并发几个,在串行代码中使用并发集合是没有意义 的,因为它们会增加无谓的开销。
通过这个命名空间能访问以下为并发做好了准备的集合。
1.BlockingCollection 与经典的阻塞队列数据结构类似,能够适用于多个任务添加和删除数据,提供阻塞和限界能力。
2.ConcurrentBag 提供对象的线程安全的无序集合
3.ConcurrentDictionary 提供可有多个线程同时访问的键值对的线程安全集合
4.ConcurrentQueue 提供线程安全的先进先出集合
5.ConcurrentStack 提供线程安全的后进先出集合
这些集合通过使用比较并交换和内存屏障等技术,避免使用典型的互斥重量级的锁,从而保证线程安全和性能。
_ConcurrenProducts = new ConcurrentQueue(); Task tk1 = Task.Factory.StartNew(() => { AddConcurrenProducts(); }); Task.WaitAll(tk1);
static void AddConcurrenProducts()
{
Parallel.For(0, 30000, (i) =>
{
Product product = new Product();
product.Name = "name" + i;
product.Category = "Category" + i;
product.SellPrice = i;
_ConcurrenProducts.Enqueue(product);
});
}
在处理的List集合数据时,可将其放到ConcurrentQueue中,然后开启多个线程去处理数据,处理完成后,再到队列中获取下一个待处理数据。
////// 多线程处理数据(无返回值) /// ///数据类型 /// 待处理数据 /// 数据处理方法(有参数无返回值) /// 处理线程数量 /// 是否等待执行结束 static void RunTask(List list, Action action, int threadCount = 5, bool waitFlag = true) { ConcurrentQueue queue = new ConcurrentQueue (list); Task[] tasks = new Task[threadCount]; for (int i = 0; i < threadCount; i++) { tasks[i] = Task.Factory.StartNew(() => { while (queue.TryDequeue(out T t)) { action(t); } }); } if (waitFlag) { Task.WaitAll(tasks); } } /// /// 多线程处理数据(返回处理后列表) /// ///数据类型 /// 待处理数据 /// 数据处理方法(有参数有返回值) /// 处理线程数量 ///数据处理后结果 static ListRunTask (List list, Func func, int threadCount = 5) { var result = new List (); ConcurrentQueue queue = new ConcurrentQueue (list); Task >[] tasks = new Task
>[threadCount]; for (int i = 0; i < threadCount; i++) { tasks[i] = Task.Factory.StartNew
>(() => { var rList = new List
(); while (queue.TryDequeue(out T t)) { rList.Add(func(t)); } return rList; }); } Task.WaitAll(tasks); for (int i = 0; i < threadCount; i++) { result.AddRange(tasks[i].Result); } return result; }



