栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

我是如何一步步的在并行编程中将lock锁次数降到最低实现无锁编程

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

我是如何一步步的在并行编程中将lock锁次数降到最低实现无锁编程

在并行编程中,经常会遇到多线程间操作共享集合的问题,很多时候大家都很难逃避这个问题做到一种无锁编程状态,你也知道一旦给共享集合套上lock之后,并发和伸缩能力往往会造成很大影响,这篇就来谈谈如何尽可能的减少lock锁次数甚至没有。

一:缘由 1. 业务背景

昨天在review代码的时候,看到以前自己写的这么一段代码,精简后如下:

 private static List ExecuteFilterList(int shopID, List trades, List filterItemList, MatrixSearchContext searchContext)
 {
     var customerIDList = new List();

     var index = 0;

     Parallel.ForEach(filterItemList, new ParallelOptions() { MaxDegreeOfParallelism = 4 },
(filterItem) =>
     {
  var context = new FilterItemContext()
  {
      StartTime = searchContext.StartTime,
      EndTime = searchContext.EndTime,
      ShopID = shopID,
      Field = filterItem.Field,
      FilterType = filterItem.FilterType,
      ItemList = filterItem.FilterValue,
      SearchList = trades.ToList()
  };

  var smallCustomerIDList = context.Execute();

  lock (filterItemList)
  {
      if (index == 0)
      {
   customerIDList.AddRange(smallCustomerIDList);
   index++;
      }
      else
      {
   customerIDList = customerIDList.Intersect(smallCustomerIDList).ToList();
      }
  }
     });

     return customerIDList;
 }

这段代码实现的功能是这样的,filterItemList承载着所有原子化的筛选条件,然后用多线程的形式并发执行里面的item,最后将每个item获取的客户人数集合在高层进行整体求交,画个简图就是下面这样。

2. 问题分析

其实这代码存在着一个很大的问题,在Parallel中直接使用lock锁的话,filterItemList有多少个,我的lock就会锁多少次,这对并发和伸缩性是有一定影响的,现在就来想想怎么优化吧!

3. 测试案例

为了方便演示,我模拟了一个小案例,方便大家看到实时结果,修改后的代码如下:

 public static void Main(string[] args)
 {
     var filterItemList = new List() { "conditon1", "conditon2", "conditon3", "conditon4", "conditon5", "conditon6" };
     ParallelTest1(filterItemList);
 }

 public static void ParallelTest1(List filterItemList)
 {
     var totalCustomerIDList = new List();

     bool isfirst = true;

     Parallel.ForEach(filterItemList, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, (query) =>
     {
  var smallCustomerIDList = GetCustomerIDList(query);

  lock (filterItemList)
  {
      if (isfirst)
      {
   totalCustomerIDList.AddRange(smallCustomerIDList);
   isfirst = false;
      }
      else
      {
   totalCustomerIDList = totalCustomerIDList.Intersect(smallCustomerIDList).ToList();
      }

      Console.WriteLine($"{DateTime.Now} 被锁了");
  }
     });

     Console.WriteLine($"最后交集客户ID:{string.Join(",", totalCustomerIDList)}");
 }

 public static List GetCustomerIDList(string query)
 {
     var dict = new Dictionary>()
     {
  ["conditon1"] = new List() { 1, 2, 4, 7 },
  ["conditon2"] = new List() { 1, 4, 6, 7 },
  ["conditon3"] = new List() { 1, 4, 5, 7 },
  ["conditon4"] = new List() { 1, 2, 3, 7 },
  ["conditon5"] = new List() { 1, 2, 4, 5, 7 },
  ["conditon6"] = new List() { 1, 3, 4, 7, 9 },
     };

     return dict[query];
 }

------ output ------
2020/04/21 15:53:34 被锁了
2020/04/21 15:53:34 被锁了
2020/04/21 15:53:34 被锁了
2020/04/21 15:53:34 被锁了
2020/04/21 15:53:34 被锁了
2020/04/21 15:53:34 被锁了
最后交集客户ID:1,7

二:第一次优化

从结果中可以看到,filterItemList有6个,锁次数也是6次,那如何降低呢? 其实实现Parallel代码的FCL大神也考虑到了这个问题,从底层给了一个很好的重载,如下所示:


public static ParallelLoopResult ForEach(OrderablePartitioner source, ParallelOptions parallelOptions, Func localInit, Func body, Action localFinally);

这个重载很特别,多了两个参数localInit和localFinally,过会说一下什么意思,先看修改后的代码体会一下


 public static void ParallelTest2(List filterItemList)
 {
     var totalCustomerIDList = new List();
     var isfirst = true;

     Parallel.ForEach>(filterItemList,
new ParallelOptions() { MaxDegreeOfParallelism = 2 },
() => { return null; },
      (query, loop, index, smalllist) =>
      {
   var smallCustomerIDList = GetCustomerIDList(query);

   if (smalllist == null) return smallCustomerIDList;

   return smalllist.Intersect(smallCustomerIDList).ToList();
      },
     (finalllist) =>
     {
  lock (filterItemList)
  {
      if (isfirst)
      {
   totalCustomerIDList.AddRange(finalllist);
   isfirst = false;
      }
      else
      {
   totalCustomerIDList = totalCustomerIDList.Intersect(finalllist).ToList();
      }
      Console.WriteLine($"{DateTime.Now} 被锁了");
  }
     });
     Console.WriteLine($"最后交集客户ID:{string.Join(",", totalCustomerIDList)}");
 }

------- output ------
2020/04/21 16:11:46 被锁了
2020/04/21 16:11:46 被锁了
最后交集客户ID:1,7
Press any key to continue . . .

很好,这次优化将lock次数从6次降到了2次,这里我用了 new ParallelOptions() { MaxDegreeOfParallelism = 2 } 设置了并发度为最多2个CPU核,程序跑起来后会开两个线程,将一个大集合划分为2个小集合,相当于1个集合3个条件,第一个线程在执行3个条件的起始处会执行你的localInit函数,在3个条件迭代完之后再执行你的localFinally,第二个线程也是按照同样方式执行自己的3个条件,说的有点晦涩,画一张图说明吧。

三: 第二次优化

如果你了解Task这种带有返回值的Task,这就好办了,多少个filterItemList就可以开多少个Task,反正Task底层是使用线程池承载的,所以不用怕,这样就完美的实现无锁编程。


 public static void ParallelTest3(List filterItemList)
 {
     var totalCustomerIDList = new List();
     var tasks = new Task>[filterItemList.Count];

     for (int i = 0; i < filterItemList.Count; i++)
     {
  tasks[i] = Task.Factory.StartNew((query) =>
  {
      return GetCustomerIDList(query.ToString());
  }, filterItemList[i]);
     }

     Task.WaitAll(tasks);

     for (int i = 0; i < tasks.Length; i++)
     {
  var smallCustomerIDList = tasks[i].Result;
  if (i == 0)
  {
      totalCustomerIDList.AddRange(smallCustomerIDList);
  }
  else
  {
      totalCustomerIDList = totalCustomerIDList.Intersect(smallCustomerIDList).ToList();
  }
     }

     Console.WriteLine($"最后交集客户ID:{string.Join(",", totalCustomerIDList)}");
 }

------ output -------

最后交集客户ID:1,7
Press any key to continue . . .

四:总结

我们将原来的6个lock优化到了无锁编程,但并不说明无锁编程就一定比带有lock的效率高,大家要结合自己的使用场景合理的使用和混合搭配。

好了,本篇就说到这里,希望对您有帮助。

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号