Csharp/C#教程:使用Parallel Linq Extensions结合两个序列,如何才能首先产生最快的结果?分享


使用Parallel Linq Extensions结合两个序列,如何才能首先产生最快的结果?

假设我有两个序列返回整数1到5。

第一个返回1,2和3非常快,但4和5每个需要200毫秒。

public static IEnumerable FastFirst() { for (int i = 1; i  3) Thread.Sleep(200); yield return i; } } 

第二个返回1,2和3,延迟时间为200ms,但快速返回4和5。

 public static IEnumerable SlowFirst() { for (int i = 1; i < 6; i++) { if (i < 4) Thread.Sleep(200); yield return i; } } 

联合这两个序列只给出数字1到5。

 FastFirst().Union(SlowFirst()); 

我不能保证两种方法中的哪一种在什么时候有延迟,所以执行的顺序不能保证为我提供解决方案。 因此,我想并行化联盟,以便最小化我的例子中的(人为的)延迟。

一个真实场景:我有一个返回一些实体的缓存,以及一个返回所有实体的数据源。 我希望能够从一个方法返回一个迭代器,该方法将请求内部并行化到缓存和数据源,以便缓存的结果尽可能快地生成。

注1:我意识到这仍然在浪费CPU周期; 我不是在问我如何防止序列迭代它们的慢元素,我怎么能尽可能快地将它们联合起来。

更新1:我已经定制了achitaka-san对接受多个生成器的响应,并使用ContinueWhenAll将BlockingCollection的CompleteAdding设置为一次。 我只是把它放在这里,因为它会因缺少注释格式而丢失。 任何进一步的反馈都会很棒!

 public static IEnumerable SelectAsync( params IEnumerable[] producer) { var resultsQueue = new BlockingCollection(); var taskList = new HashSet(); foreach (var result in producer) { taskList.Add( Task.Factory.StartNew( () => { foreach (var product in result) { resultsQueue.Add(product); } })); } Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding()); return resultsQueue.GetConsumingEnumerable(); } 

看看这个。 第一种方法只是返回结果的顺序。 第二个检查唯一性。 如果你把它们连在一起,你就会得到你想要的结果。

 public static class Class1 { public static IEnumerable SelectAsync( IEnumerable producer1, IEnumerable producer2, int capacity) { var resultsQueue = new BlockingCollection(capacity); var producer1Done = false; var producer2Done = false; Task.Factory.StartNew(() => { foreach (var product in producer1) { resultsQueue.Add(product); } producer1Done = true; if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); } }); Task.Factory.StartNew(() => { foreach (var product in producer2) { resultsQueue.Add(product); } producer2Done = true; if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); } }); return resultsQueue.GetConsumingEnumerable(); } public static IEnumerable SelectAsyncUnique(this IEnumerable source) { HashSet knownResults = new HashSet(); foreach (TResult result in source) { if (knownResults.Contains(result)) {continue;} knownResults.Add(result); yield return result; } } } 

与从数据库中提取相比,缓存几乎是即时的,因此您可以先从缓存中读取并返回这些项,然后从数据库中读取并返回除缓存中找到的项以外的项。

如果你试图将它并行化,你会增加很多复杂性,但收益却很小。

编辑:

如果源的速度没有可预测的差异,您可以在线程中运行它们并使用同步哈希集来跟踪您已经获得的项目,将新项目放入队列中,并让主线程读取从队列:

 public static IEnumerable GetParallel(Func getKey, params IEnumerable[] sources) { HashSet found = new HashSet(); List queue = new List(); object sync = new object(); int alive = 0; object aliveSync = new object(); foreach (IEnumerable source in sources) { lock (aliveSync) { alive++; } new Thread(s => { foreach (TItem item in s as IEnumerable) { TKey key = getKey(item); lock (sync) { if (found.Add(key)) { queue.Add(item); } } } lock (aliveSync) { alive--; } }).Start(source); } while (true) { lock (sync) { if (queue.Count > 0) { foreach (TItem item in queue) { yield return item; } queue.Clear(); } } lock (aliveSync) { if (alive == 0) break; } Thread.Sleep(100); } } 

测试流程:

 public static IEnumerable SlowRandomFeed(Random rnd) { int[] values = new int[100]; for (int i = 0; i < 100; i++) { int pos = rnd.Next(i + 1); values[i] = i; int temp = values[pos]; values[pos] = values[i]; values[i] = temp; } foreach (int value in values) { yield return value; Thread.Sleep(rnd.Next(200)); } } 

测试:

上述就是C#学习教程:使用Parallel Linq Extensions结合两个序列,如何才能首先产生最快的结果?分享的全部内容,如果对大家有所用处且需要了解更多关于C#学习教程,希望大家多多关注—计算机技术网(www.ctvol.com)!

 Random rnd = new Random(); foreach (int item in GetParallel(n => n, SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd))) { Console.Write("{0:0000 }", item); } 

本文来自网络收集,不代表计算机技术网立场,如涉及侵权请联系管理员删除。

ctvol管理联系方式QQ:251552304

本文章地址:https://www.ctvol.com/cdevelopment/1001080.html

(0)
上一篇 2021年12月28日
下一篇 2021年12月28日

精彩推荐