非重播热观察
原始问题
我有一个场景,我有多个IObservable
序列,我想与Merge
结合,然后听。 但是,如果其中一个产生错误,我不希望它崩溃其他流的所有内容,以及重新订阅序列(这是一个’永久’序列)。
我通过在合并之前向流添加Retry()
来执行此操作,即:
IEnumerable<IObservable> observables = GetObservables(); observables .Select(o => o.Retry()) .Merge() .Subscribe(/* Do subscription stuff */);
但是,当我想测试时会出现问题。 我想测试的是,如果observables
一个IObservable
产生一个OnError
,那么其他的仍应该能够发送它们的值并且它们应该被处理
我以为我只使用两个Subject
代表两个observables
IObservable
; 一个发送OnError(new Exception())
,另一个发送OnNext(1)
。 但是,似乎Subject
将重放新订阅的所有先前值(实际上是Retry()
),将测试转换为无限循环。
我尝试通过创建一个手动IObservable
来解决它,该手动IObservable
在第一个订阅上产生错误,之后是一个空序列,但它感觉很hacky:
var i = 0; var nErrors = 2; var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create(o => { i++; if (i < nErrors) { return Observable.Throw(new Exception()).Subscribe(o); } else { return Observable.Empty().Subscribe(o); } });
我使用Subject
还是以错误的方式思考Retry()
? 还有其他想法吗? 你会如何解决这种情况?
更新
好的,这是我想要的大理石图,并认为 Retry()
作用。
o = message, X = error. ------o---o---X Retry() -> ---o---o---X Retry() -> ...
我的问题可能更多,因为我没有一个好的库存类来使用前测试,因为Subject
希望重播我之前的所有错误。
更新2
这是一个测试用例,显示了我对Subject
重放其值的意思。 如果我说它以冷的方式做到这一点,我是否正确使用该术语? 我知道Subject
是一种创造热的可观察性的方式,但这种行为对我来说仍然感觉“冷”。
var onNext = false; var subject = new Subject(); subject.Retry().Subscribe(x => onNext = true); subject.OnError(new Exception()); subject.OnNext(1); Assert.That(onNext, Is.True);
根据您更新的要求(您希望重试失败的可观察对象,而不是仅仅想忽略它们),我们可以提出一个有效的解决方案。
首先,理解冷可观察(在每个订阅上重新创建)和热可观察(无论订阅如何都存在)之间的区别是很重要的。 你不能Retry()
一个热的observable,因为它不知道如何重新创建底层事件。 也就是说,如果一个热的可观察错误,它就永远消失了。
Subject
创建一个热的可观察对象,在某种意义上你可以在没有订阅者的情况下调用OnNext
,它将按预期运行。 要将热的observable转换为冷可观察对象,可以使用Observable.Defer
,它将包含该可观察对象的“创建订阅”逻辑。
总而言之,这是修改后的原始代码:
var success = new Subject(); var error = new Subject (); var observables = new List> { Observable.Defer(() => {success = new Subject (); return success.AsObservable();}), Observable.Defer(() => {error = new Subject (); return error.AsObservable();}) }; observables .Select(o => o.Retry()) .Merge() .Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"));
和测试(类似于之前):
success.OnNext(1); error.OnError(new Exception("test")); success.OnNext(2); error.OnNext(-1); success.OnCompleted(); error.OnCompleted();
并按预期输出:
1 2 -1 done
当然,您需要根据您可观察到的内容来显着修改此概念。 使用主题进行测试与使用它们是真实的不同。
我还要注意这个评论:
但是,似乎Subject将重放新订阅的所有先前值(实际上是Retry()),将测试转换为无限循环。
不是真的 – Subject
不会这样做。 您的代码还有一些其他方面导致无限循环基于Retry
重新创建订阅的事实,并且订阅会在某个时刻创建错误。
原始答案 (完成)
问题是Retry()
不能做你想做的事。 从这里:
https://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx
重复源可观察序列retryCount次或直到它成功终止。
这意味着Retry
将不断尝试重新连接到底层的observable,直到它成功并且不会抛出错误。
我的理解是你实际上希望observable中的exception被忽略 ,而不是重试。 这将做你想要的事情:
observables .Select(o => o.Catch((Func>)(e => Observable.Empty()))) .Merge() .Subscribe(/* subscription code */);
这使用Catch
来捕获具有exception的observable,并在该点将其替换为空的observable。
以下是使用主题的完整测试:
var success = new Subject(); var error = new Subject (); var observables = new List> { success.AsObservable(), error.AsObservable() }; observables .Select(o => o.Catch((Func>)(e => Observable.Empty ()))) .Merge() .Subscribe(Observer.Create (Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"))); success.OnNext(1); error.OnError(new Exception("test")); success.OnNext(2); success.OnCompleted();
这正如预期的那样产生:
上述就是C#学习教程:非重播热观察分享的全部内容,如果对大家有所用处且需要了解更多关于C#学习教程,希望大家多多关注—计算机技术网(www.ctvol.com)!
1 2 done
本文来自网络收集,不代表计算机技术网立场,如涉及侵权请联系管理员删除。
ctvol管理联系方式QQ:251552304
本文章地址:https://www.ctvol.com/cdevelopment/951754.html