0
点赞
收藏
分享

微信扫一扫

为什么 RxJava 有 Single / Maybe 等单发数据类型,而 Flow 没有?

Coroutine Flow 与 RxJava 都是流式数据处理框架, 所以经常被拿来做比较,本文比较一下他们的数据类型。

Rx 与 Flow 在单发数据支持上的不同

RxJava 支持多种数据类型

  • ​Observable​​ :流式数据类型
  • ​Flowable​​:与 Observable 类似,支持背压
  • ​Single​​:单发数据类型,只能且必须发射一个数据
  • ​Maybe​​:单发数据类型,发射零个或一个数据
  • ​Completable​​:不发射任何数据,只通知流的结束。

以上,​​Single<T>​​​,​​Maybe<T>​​​ 以及 ​​Completable​​​ 都至多只能发射一个数据(单发数据类型)。而反观 Coroutine Flow,只提供了 ​​Flow<T>​​​ 这一种类型,对标 ​​Observable<T>​​​ 或 ​​Flowable<T>​​ (Flow 天然支持背压)的流式数据。同为流式框架,为什么 Rx 需要支持单发数据类型,而 Flow 不提供不支持呢?

Rx 支持单发数据主要源于以下三个原因(或目的),而在这几点对于 Flow 却构不成问题:

RxJava 支持单发的原因

Flow 不支持单发的原因

线程切换

RxJava 同时是一个异步框架,提供 ​​observeOn​​、​​subscribeOn​​ 等线程操作符。在 Java 时代,缺少称手的多线程工具,Rx 对于单发数据也是最好的选择之一。

进入 Kotlin 时代 Coroutine 提供了足够的异步处理工具,单发数据使用挂起函数实现足矣。Flow 的线程切换也是构筑在 Coroutine 之上。

代码直观

RxJava 的操作符帮助单发数据实现链式调用,避免回调。比如通过 ​​zip​​, ​​concat​​ 等实现单发数据的组合,或者基于 ​​switchIfEmpty​​ 等实现单发数据的选择逻辑等。

Coroutine 可以使用同步调用的方式完成异步,无需再借助链式调用语法来规避回调。

类型转换

很多业务场景都涉及单发与流式数据的转换,RxJava 为这些转换提供操作符支持。比如 ​​toObservable​​ 或者 ​​flatMapObservable​​ 将单发数据转成流式数据,反之则可以通过 ​​first​​, ​​toList​​ 等将流式数据转成单发数据

Flow 也提供了双向转换,而且更加简单,比如 ​​toList​​ 直接输出拆箱后的数据类型 ​​T​​,无需为单发数据专门定义装箱类型。

总结起来,RxJava 在很多方面弥补了语言本身的不足,能力越大责任也越大,Rx 对于单发或是流式数据的场景都要有所考虑。而 Kotlin 通过 Coroutine 解决了大部分异步场景的开发需要。Flow 只专心于流式数据处理即可,虽然你依然可以使用 Flow 接受或发送单发数据,但是官方并不推荐这么做,自然也就不提供额外的单发数据类型。

接下来,通过与 Rx 的对比来具体了解一下 Coroutine 是如何对单发数据提供支持的。

线程切换

下面通过例子对比一下 Rx 与 Coroutine 的线程切换。

首先,我们模拟一个 RxJava 中的单发数据请求:

fun readFromRemoteRx(data: String = "data"): Single<String> {
return Single.create { it.onSuccess(data) }
.delay(100, TimeUnit.MILLISECONDS)
.doOnSuccess { println("read from remote: $it") }
.subscribeOn(Schedulers.io())
}

如上,​​delay​​ 模拟 IO 的延时,​​subscribeOn​​ 指定数据请求发生在 IO 线程。

前面说过,线程切换这种事情已经不是 Flow 的主要职责了。在 Coroutine 中,单发请求使用挂起函数即可:

suspend fun readFromRemote(data: String = "data"): String {
delay(100)
println("read from remote: $data")
return data
}

如上,我们用挂起函数定义单发数据,在协程中通过 ​​withContext​​ 就可以切换到 IO 线程。

代码直观

Coroutine 处理单发数据的代码相对于 Rx 更加简洁。

选择逻辑

先看一个单发数据选择逻辑的例子, 在 Rx 我们通过操作符进行选择:

fun readFromCacheRx(data: String? = null): Maybe<String> {
return run {
if (data != null) Maybe.just(data)
else Maybe.empty()
}.delay(100, TimeUnit.MILLISECONDS)
.doOnSuccess { println("read from cache: $it") }
}

fun test() {
readFromCacheRx(null) // pass "data" to check when cache has data
.switchIfEmpty(readFromRemoteRx())
.subscribeOn(Schedulers.io())
.test()
}

如上,​​readFromCacheRx​​ 使用 ​​Maybe​​ 类型模拟本地数据源的请求结果,当本地没有数据时请求网络远程数据。 Rx 基于 ​​switchIfEmpty​​ 完成条件选择逻辑,否则我们只能在异步回调中做判断。

在 Kotlin 时代,我们在 Coroutine 中用挂起函数实现选择逻辑:

suspend fun readFromCache(data: String? = null): String? {
delay(100)
println("read from cache: $data")
return data
}

fun test() {
runBlocking {
withContext(Dispatchers.IO) {
val data = readFromCache() ?: readFromRemote()
}
}
}

​readFromCache​​ 返回一个 ​​Nullable​​ 类型,直接使用 ​​?:​​ 即可,基于协程的同步调用优势,可以命令式地写任何控制语句。

组合逻辑

再看一个组合逻辑的例子,Rx 使用 ​​zip​​ 将两个单发数据组合成成一个新的单发数据:

fun test() {
readFromRemoteRx().zipWith(readFromRemote2Rx()) { res, res2 ->
"$res & $res2"
}.doOnSuccess { println("read from remote: $it") }
.subscribeOn(Schedulers.io())
.test()
}

/*
output:
-------------------
read from remote: data & data
*/

Coroutine 的实现同样的逻辑则非常简单,使用 ​​async​​ + ​​await​​ 用命令式语句即可:

fun test() {
runBlocking {
val res = async { readFromRemote() }
val res2 = async { readFromRemote2() }
val data = "${res.await()} & ${res.await()}"
println("read from remote: $data")
}
}
复制代码

类型转换

接下来对比一下单发与流式的数据转换。

单发 > 流式

Rx 可以使用 ​​toObservable​​ 或者 ​​flatMapObservable​​ 将单发类型转成 ​​Observable​​:

readFromCacheRx()
.flatMapObservable { Observable.just(it) }
.doOnNext { println("read from cache: $it") }
.doOnComplete { println("complete") }
.test()

/*
output:
-------------------
read from cache: null
complete
*/

由于 ​​readFromCacheRx​​ 没有发射任何数据,所以没有 ​​doOnNext​​ 的日志输出。

协程的单发转流式数据很简单,​​flow {...}​​ 是 Flow 的构造器,内部可以直接调用挂起函数,如果需要还可以使用 ​​withContext​​ 切换线程。

runBlocking {
flow { readFromCache()?.let { emit(it) } }
.onCompletion { println("complete") }
.collect { println("next: $it") }
}

我们常常会组合多个单发数据来实现某些业务逻辑。比如 Rx 中使用 merge 组合多个数据源的读取结果,当本地 Cache 有数据时会先行发送,这有利于冷启后的首屏快速显示

Observable.merge(
readFromCacheRx().toObservable(),
readFromRemoteRx().toObservable()
).test()

同样的逻辑,在 Flow 中同样可以基于挂起函数实现。

flowOf(
flow { emit(readFromRemote()) }, flow { emit(readFromRemote()) })
.flattenMerge()
.collect { println("$it") }

流式 > 单发

Rx 中我们可以将一个 ​​Observable​​ 转化成 ​​Single​​ 数据:

fun test() {
Observable.just(1, 2, 3)
.toList()
.doOnSuccess { println("$it") }
.test()

Observable.just(1, 2, 3)
.first()
.doOnSuccess { println("$it") }
.test()
}

/*
output:
----------
[1, 2, 3]
1
*/

Flow 也提供了类似的操作符比如 ​​first​​,​​toList​​ 等,而且直接输出拆箱后的数据,不必再通过 ​​collect​​ 进行收集

data = flowOf(1, 2, 3).toList()
println("$data")

流式 > 单发 > 流式

有一些业务场景中,可能需要流式 > 单发 > 流式这样的多次转换,这里面通常涉及 ​​flatMap​​ 或 ​​concatMap​​ 等的异步转换。

Observable.just(1, 3, 5)
.concatMapSingle { readFromRemoteRx("$it") }
.doOnComplete { println("complete") }
.subscribe { println("next: $it") }

/*
output:
---------------------
read from remote: 1
next: 1
read from remote: 3
next: 3
read from remote: 5
next: 5
complete
*/

上面例子中,我们在数据流中串行的进行了三次单发请求并返回结果。相对于串行的 ​​concatMapSingle​​, Rx 同时还提供了并行版本的 ​​flatMapSingle​​ 。
同样的逻辑如果用 Flow 实现,如下:

runBlocking {
flowOf(1, 3, 5)
.flatMapConcat { flow { emit(readFromRemote("$it")) } }
.onCompletion { println("complete") }
.collect { println("next: $it") }
}

Flow 的 ​​flatMapConcat​​ 与 Rx 的同名方法功能一样,都是将 ​​flatMap​​ 后的数据流再次进行串行方式。Flow 也提供了 ​​flatMapMerge​​ 处理并行的场景,相当于 Rx 中的 ​​flatMap​​。出于命名清晰的考虑,Flow 的 ​​flatMap​​ 方法已经 Deprecate 改名为 ​​flatMapMerge​​。

​flatMapConcat​​ 或 ​​flatMapMerge​​ 在转换时每次都要构建一个 ​​Flow<T>​​ ,这对于单发数据是没必要的开销,因此我们可以使用 ​​map​​ 简化:

runBlocking {
flowOf(1, 3, 5)
.map { readFromRemote("$it") }
.onCompletion { println("complete") }
.collect { println("next: $it") }
}

效果等价于 ​​flatMapConcat​​,注意 ​​map​​ 无法在并行场景中使用,即使你在 ​​map​​ 中切换了新的线程。Flow 的 ​​map { }​​ 内可调用挂起函数,所以可以基于协程实现异步逻辑,而 Rx 的 ​​map​​ 内只能同步执行,所以有人会将 Flow 的 ​​map​​ 比作 Rx 的 ​​flatMap​​,这是不准确的,因为 Flow 的 map 并不能使整个数据流串行发射,map 会挂起等待当前数据执行结束后再继续。

流式 > Comletable

Rx 还提供了 ​​Completable​​ 类型,我们可以在流式处理中插入无需返回结果的逻辑,例如下面这种场景

fun saveToCacheRx(data: String): Completable {
return Completable
.fromAction { println("saved to cache: $data") }
.delay(100, TimeUnit.MILLISECONDS)
}

Observable.just(1, 2, 3)
.flatMapCompletable { saveToCacheRx("$it") }
.doOnComplete { println("complete") }
.subscribe { println("next: $it") }

/*
output:
-------------------
saved to cache: 1
saved to cache: 2
saved to cache: 3
complete
*/

​saveToCacheRx​​ 模拟一个数据存储,Completable 没有任何实际返回值,只用来通知存储已结束,因此日志中没有 ​​next​​ ,只有最后的 ​​complete​​ 。

Flow 如何实现同样的逻辑呢?

suspend fun saveToCache(data: String) {
delay(100)
println("saved to cache: $data")
}

runBlocking {
flowOf(1, 2, 3)
.flatMapMerge { flow<String> { saveToCache("$it") } }
.onCompletion { println("complete") }
.collect { println("next: $it") }

/*
output:
-------------------
saved to cache: 1
saved to cache: 2
saved to cache: 3
complete
*/

如上,挂起函数的 ​​saveToCache​​ 没有任何返回值。​​flow { ... }​​ 中调用的挂起函数执行结束后,Flow 的后续执行就会继续,无需像 Rx 那样通过 ​​onComplete​​ 通知。由于挂起函数没有返回任何数值,​​next​​ 日志也不会输出。

总结

在 Java 时代,由于语言能力的缺失 RxJava 需要承包包括单发数据在内的处理, 而进入 Kotlin 时代,挂起函数处理单发数据已经足矣,Flow 不是处理单发数据的最佳方案,我们在今后选型时因该避免对 Flow 的滥用。管中窥豹,可以预见 Kotlin 及协程的强大似的今后 RxJava 的使用场景将越来越少。


举报

相关推荐

0 条评论