Reactive Extensions超时不会停止序列?
我正在尝试创建一个IObservable
,如果在最后5秒内收到UDP消息,则返回true
,如果发生超时,则返回false。
到目前为止我有这个:
public IObservable GettingUDPMessages(IPEndPoint localEP) { var udp = BaseComms.UDPBaseStringListener(localEP) .Where(msg => msg.Data.Contains("running")) .Select(s => true); return Observable .Timeout(udp, TimeSpan.FromSeconds(5)) .Catch(Observable.Return(false)); }
这个问题是: –
我可以使用Subject
但是当没有更多订阅者时我需要小心处理UDPBaseStringListener
observable。
更新
每次我收到UDP消息时,我都希望它返回true
。 如果我在最近5秒内没有收到UDP消息,我希望它返回false
。
正如Bj0所指出的,具有BufferWithTime
的解决方案在收到数据点后不会立即返回数据点,并且在接收到数据点后不会重置缓冲区超时。
使用Rx Extensions 2.0,您可以使用接受超时和大小的新缓冲区重载来解决这两个问题:
static IObservable GettingUDPMessages(IPEndPoint localEP) { return BaseComms .UDPBaseStringListener(localEP) .Where(msg => msg.Data.Contains("running")) .Buffer(TimeSpan.FromSeconds(5), 1) .Select(s => s.Count > 0) .DistinctUntilChanged(); }
缓冲区的问题是,当你获得一个新值时,“超时”间隔不会被重置,缓冲区窗口只是相互跟随的时间片段(在这种情况下为5秒)。 这意味着,根据您收到最后一个值的时间,您可能需要等待几乎两倍的超时值。 这也可能会错过超时:
should timeout here v 0s 5s 10s 15s |x - x - x | x - - - - | - - - x -| ... true true true
但是,IObservable.Throttle每次进入新值时都会自行重置,并且仅在时间跨度结束后生成一个值(最后一个传入值)。 这可以用作超时并与IObservable合并以将“超时”值插入到流中:
var obs = BaseComms.UDPBaseStringListener(localEP) .Where(msg => msg.Data.Contains("running")); return obs.Merge( obs.Throttle( TimeSpan.FromSeconds(5) ) .Select( x => false ) ) .DistinctUntilChanged();
一个有效的LINQPad示例:
var sub = new Subject(); var script = sub.Timestamp() .Merge( sub.Throttle(TimeSpan.FromSeconds(2)).Select( i => -1).Timestamp()) .Subscribe( x => { x.Dump("val"); }); Thread.Sleep(1000); sub.OnNext(1); sub.OnNext(2); Thread.Sleep(10000); sub.OnNext(5);
在2s超时后,将-1插入流中。
我建议避免使用Timeout
– 它会导致exception,而exception编码则很糟糕。
此外,似乎只有你的观察值在一个值之后停止才有意义。 您可能需要更多地解释您希望行为是什么。
我目前解决您的问题的方法是:
public IObservable GettingUDPMessages(IPEndPoint localEP) { return Observable.Create(o => { var subject = new AsyncSubject (); return new CompositeDisposable( Observable.Amb( BaseComms .UDPBaseStringListener(localEP) .Where(msg => msg.Data.Contains("running")) .Select(s => true), Observable .Timer(TimeSpan.FromMilliseconds(10.0)) .Select(_ => false) ).Take(1).Subscribe(subject), subject.Subscribe(o)); }); }
这有帮助吗?
如果您不希望序列停止,只需将其包装在Defer + Repeat中:
上述就是C#学习教程:Reactive Extensions超时不会停止序列?分享的全部内容,如果对大家有所用处且需要了解更多关于C#学习教程,希望大家多多关注—计算机技术网(www.ctvol.com)!
Observable.Defer(() => GettingUDPMessages(endpoint) .Repeat();
本文来自网络收集,不代表计算机技术网立场,如涉及侵权请联系管理员删除。
ctvol管理联系方式QQ:251552304
本文章地址:https://www.ctvol.com/cdevelopment/955742.html