0
点赞
收藏
分享

微信扫一扫

RxJava学习(五)

梦幻之云 2022-11-22 阅读 75


使用RxJava从多个数据源中加载数据

需求:
1.偶尔的从网络上获取新数据。
2.尽快的恢复数据(通过缓存网络数据的结果)

我们看看用RxJava如何实现

我们给每一个数据源一个Obserable接口,然后通过concat和first这两个操作符来实现。
1.Data(模拟数据源)

public class Data {
private static final long STALE_MS = 5 * 1000; // Data is stale after 5 seconds

final String value;

final long timestamp;

public Data(String value) {
this.value = value;
this.timestamp = System.currentTimeMillis();
}

public boolean isUpToDate() {
return System.currentTimeMillis() - timestamp < STALE_MS;
}
}

2.Source(定义观察源)

public class Source {

// Memory cache of data
private Data memory = null;

// What's currently "written" on disk
private Data disk = null;

// Each "network" response is different
private int requestNumber = 0;

// In order to simulate memory being cleared, but data still on disk
public void clearMemory() {
System.out.println("Wiping memory...");
memory = null;
}
Observable<Data> memory(){
Observable<Data> observable = Observable.create(new Observable.OnSubscribe<Data>() {
@Override
public void call(Subscriber<? super Data> subscriber) {
subscriber.onNext(memory);
subscriber.onCompleted();
}
});//使用固定的装欢把Observable进行转换
return observable.compose(logSource("=====MEMORY"));
}

Observable<Data> disk(){
Observable<Data> observable = Observable.create(new Observable.OnSubscribe<Data>() {
@Override
public void call(Subscriber<? super Data> subscriber) {
subscriber.onNext(disk);
subscriber.onCompleted();
}
});
return observable.doOnNext(new Action1<Data>() {
@Override
public void call(Data data) {
memory = data;
}
}).compose(logSource("=====DISK"));
}

Observable<Data> network(){
Observable<Data> observable = Observable.create(new Observable.OnSubscribe<Data>() {
@Override
public void call(Subscriber<? super Data> subscriber) {
requestNumber++;
subscriber.onNext(new Data("Server Response #" + requestNumber));
subscriber.onCompleted();
}
});
observable.doOnNext(new Action1<Data>() {
@Override
public void call(Data data) {
memory = data;
disk = data;
}
});
return observable.compose(logSource("========NETWORK"));
}

Observable.Transformer<Data,Data> logSource(final String resource){
return new Observable.Transformer<Data, Data>() {
@Override
public Observable<Data> call(Observable<Data> dataObservable) {
return dataObservable.doOnNext(new Action1<Data>() {
@Override
public void call(Data data) {
if (data == null) {
System.out.println(resource + " does not have any data.");
}
else if (!data.isUpToDate()) {
System.out.println(resource + " has stale data.");
}
else {
System.out.println(resource + " has the data you are looking for!");
}
}
});
}
};
}
}

3.测试类

public class MainActivity extends AppCompatActivity {
Source mSource = new Source();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//concat:按顺序排列Observerable对象,
final Observable<Data> observable = Observable.concat(
mSource.memory(),
mSource.disk(),//first,返回一个Observable对象,只发射第一个数据,并且进行筛选
mSource.network()).
first(new Func1<Data, Boolean>() {
@Override
public Boolean call(Data data) {
//过滤作用,只需要不为空并且未过时的数据
return data!=null&&data.isUpToDate();
}
});
//每秒请求最新数据,每秒订阅Observable,获取最新数据
Observable.interval(1, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<Data>>() {
@Override
public Observable<Data> call(Long aLong) {
return observable;
}
}).subscribe(new Action1<Data>() {
@Override
public void call(Data data) {
System.out.println("====Received: " + data.value);
}
});
//每3秒清除一次内存
Observable.interval(3,TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
mSource.clearMemory();
}
});
//
sleep(15 * 1000);
}
static void sleep(long millis) {
try {
Thread.sleep(millis);
}
catch (InterruptedException e) {
// Ignore
}
}
}

我们看看最终的运行效果

RxJava学习(五)_ide

最后给出代码的下载地址
​点此下载源码​​


举报

相关推荐

0 条评论