如何使用C#任务并行库和IProducerConsumerCollection实现通用回调?
我有一个组件向基于Web的API提交请求,但必须限制这些请求,以免违反API的数据限制。 这意味着所有请求必须通过队列来控制它们的提交速率,但它们可以(并且应该)同时执行以实现最大吞吐量。 每个请求必须在完成后的某个时刻将某些数据返回给调用代码。
我正在努力创建一个很好的模型来处理数据的返回。
使用BlockingCollection
我不能只从Schedule
方法返回Task
,因为入队和出队进程位于缓冲区的两端。 因此,我创建了一个RequestItem
类型,其中包含Action<Task>
forms的回调。
这个想法是,一旦一个项目被从队列中拉出,就可以用启动的任务调用回调,但是我已经丢失了那个点的generics类型参数,而且我还在使用reflection和各种恶作剧(如果它是甚至可能)。
例如:
public class RequestScheduler { private readonly BlockingCollection _queue = new BlockingCollection(); public RequestScheduler() { this.Start(); } // This can't return Task, so returns void. // Instead RequestItem is generic but this poses problems when adding to the queue public void Schedule(RequestItem request) { _queue.Add(request); } private void Start() { Task.Factory.StartNew(() => { foreach (var item in _queue.GetConsumingEnumerable()) { // I want to be able to use the original type parameters here // is there a nice way without reflection? // ProcessItem submits an HttpWebRequest Task.Factory.StartNew(() => ProcessItem(item)) .ContinueWith(t => { item.Callback(t); }); } }); } public void Stop() { _queue.CompleteAdding(); } } public class RequestItem : IRequestItem { public IOperation Operation { get; set; } public Action<Task> Callback { get; set; } }
如何从缓冲区中提取请求并将其提交给API,如何继续缓冲我的请求,但将Task
返回给客户端?
首先,您可以从Schedule()
返回Task
,您只需要使用TaskCompletionSource
。
其次,为了解决通用性问题,你可以隐藏所有内容(非generics) Action
。 在Schedule()
,使用完全符合您需要的lambda创建一个动作。 消费循环然后将执行该动作,它不需要知道内部是什么。
第三,我不明白你为什么要在循环的每次迭代中开始一个新的Task
。 首先,它意味着你实际上不会受到任何限制。
通过这些修改,代码可能如下所示:
上述就是C#学习教程:如何使用C#任务并行库和IProducerConsumerCollection实现通用回调?分享的全部内容,如果对大家有所用处且需要了解更多关于C#学习教程,希望大家多多关注—计算机技术网(www.ctvol.com)!
public class RequestScheduler { private readonly BlockingCollection m_queue = new BlockingCollection (); public RequestScheduler() { this.Start(); } private void Start() { Task.Factory.StartNew(() => { foreach (var action in m_queue.GetConsumingEnumerable()) { action(); } }, TaskCreationOptions.LongRunning); } public Task Schedule (IOperation operation) { var tcs = new TaskCompletionSource (); Action action = () => { try { tcs.SetResult(ProcessItem(operation)); } catch (Exception e) { tcs.SetException(e); } }; m_queue.Add(action); return tcs.Task; } private T ProcessItem(IOperation operation) { // whatever } }
本文来自网络收集,不代表计算机技术网立场,如涉及侵权请联系管理员删除。
ctvol管理联系方式QQ:251552304
本文章地址:https://www.ctvol.com/cdevelopment/1022267.html