public class RxSearchObservable {
public static Observable<String> fromView(SearchView searchView) {
//Subject的理解看这里
final PublishSubject<String> subject = PublishSubject.create();
searchView.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
@Override
public boolean onQueryTextSubmit(String s) {
subject.onComplete();
return true;
}
@Override
public boolean onQueryTextChange(String text) {
subject.onNext(text);
return true;
}
});
return subject;
}
}
RxSearchObservable.fromView(searchView)
.debounce(300, TimeUnit.MILLISECONDS) //时间跨度300毫秒以上才触发
.filter(new Predicate<String>() { //过滤,下面的test是过滤条件
@Override
public boolean test(String text) throws Exception {
if (text.isEmpty()) {
return false;
} else {
return true;
}
}
})
.distinctUntilChanged() //去重,比如一开始是abc,用户在300毫秒内删除c再输入c,300毫秒后也不会重新再次触发。
.switchMap(new Function<String, ObservableSource<String>>() { //只取最新的结果,比如请求ab很久了还没回来又请求了abc,只取最后的abc的结果。
@Override
public ObservableSource<String> apply(String query) throws Exception {
return dataFromNetwork(query);
}
})
.subscribeOn(Schedulers.io()) //io耗时操作子线程
.observeOn(AndroidSchedulers.mainThread()) //ui更新线程
.subscribe(new Consumer<String>() {
@Override
public void accept(String result) throws Exception {
textViewResult.setText(result);
}
});