Csharp/C#教程:Reactive Extensions超时不会停止序列?分享


Reactive Extensions超时不会停止序列?

我正在尝试创建一个IObservable ,如果在最后5秒内收到UDP消息,则返回true ,如果发生超时,则返回false。

到目前为止我有这个:

 public IObservable GettingUDPMessages(IPEndPoint localEP) { var udp = BaseComms.UDPBaseStringListener(localEP) .Where(msg => msg.Data.Contains("running")) .Select(s => true); return Observable .Timeout(udp, TimeSpan.FromSeconds(5)) .Catch(Observable.Return(false)); } 

这个问题是: –

我可以使用Subject但是当没有更多订阅者时我需要小心处理UDPBaseStringListener observable。

更新

每次我收到UDP消息时,我都希望它返回true 。 如果我在最近5秒内没有收到UDP消息,我希望它返回false

正如Bj0所指出的,具有BufferWithTime的解决方案在收到数据点后不会立即返回数据点,并且在接收到数据点后不会重置缓冲区超时。

使用Rx Extensions 2.0,您可以使用接受超时和大小的新缓冲区重载来解决这两个问题:

 static IObservable GettingUDPMessages(IPEndPoint localEP) { return BaseComms .UDPBaseStringListener(localEP) .Where(msg => msg.Data.Contains("running")) .Buffer(TimeSpan.FromSeconds(5), 1) .Select(s => s.Count > 0) .DistinctUntilChanged(); } 

缓冲区的问题是,当你获得一个新值时,“超时”间隔不会被重置,缓冲区窗口只是相互跟随的时间片段(在这种情况下为5秒)。 这意味着,根据您收到最后一个值的时间,您可能需要等待几乎两倍的超时值。 这也可能会错过超时:

  should timeout here v 0s 5s 10s 15s |x - x - x | x - - - - | - - - x -| ... true true true 

但是,IObservable.Throttle每次进入新值时都会自行重置,并且仅在时间跨度结束后生成一个值(最后一个传入值)。 这可以用作超时并与IObservable合并以将“超时”值插入到流中:

 var obs = BaseComms.UDPBaseStringListener(localEP) .Where(msg => msg.Data.Contains("running")); return obs.Merge( obs.Throttle( TimeSpan.FromSeconds(5) ) .Select( x => false ) ) .DistinctUntilChanged(); 

一个有效的LINQPad示例:

 var sub = new Subject(); var script = sub.Timestamp() .Merge( sub.Throttle(TimeSpan.FromSeconds(2)).Select( i => -1).Timestamp()) .Subscribe( x => { x.Dump("val"); }); Thread.Sleep(1000); sub.OnNext(1); sub.OnNext(2); Thread.Sleep(10000); sub.OnNext(5); 

在2s超时后,将-1插入流中。

我建议避免使用Timeout – 它会导致exception,而exception编码则很糟糕。

此外,似乎只有你的观察值在一个值之后停止才有意义。 您可能需要更多地解释您希望行为是什么。

我目前解决您的问题的方法是:

 public IObservable GettingUDPMessages(IPEndPoint localEP) { return Observable.Create(o => { var subject = new AsyncSubject(); return new CompositeDisposable( Observable.Amb( BaseComms .UDPBaseStringListener(localEP) .Where(msg => msg.Data.Contains("running")) .Select(s => true), Observable .Timer(TimeSpan.FromMilliseconds(10.0)) .Select(_ => false) ).Take(1).Subscribe(subject), subject.Subscribe(o)); }); } 

这有帮助吗?

如果您不希望序列停止,只需将其包装在Defer + Repeat中:

上述就是C#学习教程:Reactive Extensions超时不会停止序列?分享的全部内容,如果对大家有所用处且需要了解更多关于C#学习教程,希望大家多多关注—计算机技术网(www.ctvol.com)!

 Observable.Defer(() => GettingUDPMessages(endpoint) .Repeat(); 

本文来自网络收集,不代表计算机技术网立场,如涉及侵权请联系管理员删除。

ctvol管理联系方式QQ:251552304

本文章地址:https://www.ctvol.com/cdevelopment/955742.html

(0)
上一篇 2021年11月21日
下一篇 2021年11月21日

精彩推荐