使用Reactive Extensions搜索TextChanged




我像下面的代码一样实现它,但我想知道是否有更好或更清晰的方法来使用Rx(Reactive Extension)运算符,我觉得在第一个observable的subscribe方法中创建第二个observable比声明性的,对于if语句也是如此。

var searchStream = Observable.FromEventPattern(s => txtSearch.TextChanged += s, s => txtSearch.TextChanged -= s) .Throttle(TimeSpan.FromMilliseconds(300)) .Select(evt => { var txtbox = evt.Sender as TextBox; return txtbox.Text; } ); searchStream .DistinctUntilChanged() .ObserveOn(SynchronizationContext.Current) .Subscribe(searchTerm => { this.parties.Clear(); this.partyBindingSource.ResetBindings(false); long partyCount; var foundParties = string.IsNullOrEmpty(searchTerm) ? partyRepository.GetAll(out partyCount) : partyRepository.SearchByNameAndNotes(searchTerm); foundParties .ToObservable(Scheduler.Default) .TakeUntil(searchStream) .Buffer(500) .ObserveOn(SynchronizationContext.Current) .Subscribe(searchResults => { this.parties.AddRange(searchResults); this.partyBindingSource.ResetBindings(false); } , innerEx => { } , () => { } ); } , ex => { } , () => { } ); 


我想你想要这样的东西。 编辑:从你的评论,我看到你有一个同步存储库API – 我将保留异步版本,然后添加一个同步版本。 内联注释:



 public interface IPartyRepository { Task> GetAllAsync(out long partyCount); Task> SearchByNameAndNotesAsync(string searchTerm); } 


 var searchStream = Observable.FromEventPattern( s => txtSearch.TextChanged += s, s => txtSearch.TextChanged -= s) .Select(evt => txtSearch.Text) // better to select on the UI thread .Throttle(TimeSpan.FromMilliseconds(300)) .DistinctUntilChanged() // placement of this is important to avoid races updating the UI .ObserveOn(SynchronizationContext.Current) .Do(_ => { // I like to use Do to make in-stream side-effects explicit this.parties.Clear(); this.partyBindingSource.ResetBindings(false); }) // This is "the money" part of the answer: // Don't subscribe, just project the search term // into the query... .Select(searchTerm => { long partyCount; var foundParties = string.IsNullOrEmpty(searchTerm) ? partyRepository.GetAllAsync(out partyCount) : partyRepository.SearchByNameAndNotesAsync(searchTerm); // I assume the intention of the Buffer was to load // the data into the UI in batches. If so, you can use Buffer from nuget // package Ix-Main like this to get IEnumerable batched up // without splitting it up into unit sized pieces first return foundParties // this ToObs gets us into the monad // and returns IObservable> .ToObservable() // the ToObs here gets us into the monad from // the IEnum> returned by Buffer // and the SelectMany flattens so the output // is IObservable> .SelectMany(x => x.Buffer(500).ToObservable()) // placement of this is again important to avoid races updating the UI // erroneously putting it after the Switch is a very common bug .ObserveOn(SynchronizationContext.Current); }) // At this point we have IObservable> // Switch flattens and returns the most recent inner IObservable, // cancelling any previous pending set of batched results // superceded due to a textbox change // ie the previous inner IObservable<...> if it was incomplete // - it's the equivalent of your TakeUntil, but a bit neater .Switch() .Subscribe(searchResults => { this.parties.AddRange(searchResults); this.partyBindingSource.ResetBindings(false); }, ex => { }, () => { }); 



 public interface IPartyRepository { IEnumerable GetAll(out long partyCount); IEnumerable SearchByNameAndNotes(string searchTerm); } 

就个人而言,我不建议像这样同步存储库接口。 为什么? 它通常会执行IO,因此您将浪费地阻塞线程。

你可能会说客户端可以从后台线程调用,或者你可以将他们的调用包装在一个任务中 – 但这不是我想的正确方法。


 var searchStream = Observable.FromEventPattern( s => txtSearch.TextChanged += s, s => txtSearch.TextChanged -= s) .Select(evt => txtSearch.Text) .Throttle(TimeSpan.FromMilliseconds(300)) .DistinctUntilChanged() .ObserveOn(SynchronizationContext.Current) .Do(_ => { this.parties.Clear(); this.partyBindingSource.ResetBindings(false); }) .Select(searchTerm => // Here we wrap the synchronous repository into an // async call. Note it's simply not enough to call // ToObservable(Scheduler.Default) on the enumerable // because this can actually still block up to the point that the // first result is yielded. Doing as we have here, // we guarantee the UI stays responsive Observable.Start(() => { long partyCount; var foundParties = string.IsNullOrEmpty(searchTerm) ? partyRepository.GetAll(out partyCount) : partyRepository.SearchByNameAndNotes(searchTerm); return foundParties; }) // Note you can supply a scheduler, default is Scheduler.Default .SelectMany(x => x.Buffer(500).ToObservable()) .ObserveOn(SynchronizationContext.Current)) .Switch() .Subscribe(searchResults => { this.parties.AddRange(searchResults); this.partyBindingSource.ResetBindings(false); }, ex => { }, () => { }); 




