0
点赞
收藏
分享

微信扫一扫

Kotlin协程之flow工作原理,想提高开发效率的必看

safeCollector.releaseIntercepted()
}
}

public abstract suspend fun collectSafely(collector: FlowCollector)
}

可以看到 collect 方法中通过 collector 封装了一个 SafeCollector 对象,并以其为参数执行了 SafeFlow.collectSafely 方法,而 collectSafely 方法只是执行了 block 代码块(collector.block()),它是一个扩展函数,所以执行的示例代码中的 emit(1) 其实就是调用了 SafeCollector.emit(1), 然后在 SafeCollector 中对 FlowCollector 做了一层安全校验后,最后还是会调用 FlowCollector.emit 方法,即创建 SafeCollector 时传入的 collector 对象的 emit 方法。这里只关注核心流程,故不贴出具体代码了。

根据上面我们看到的 collect {} 扩展函数的源码,可以知道其 emit 方法其实就是执行 collect {} 中传入的 action 代码块,参数为 emit 发射的值 – 1.

小结:flow {} 方式(或flowOf, asFlow)创建的 Flow 实例是 SafeFlow 类型,其父类是 AbstractFlow 抽象类,当调用其 collect(FlowCollector) 方法时,首先会执行该 Flow 对象传入的 block 代码块,代码块中一般会有 emit 方法发射值,这个 emit 调用的就是 AbstractFlow.emit 方法,在其中做了安全判定后,会接着调用到 collect 中传入的 FlowCollector.emit 方法,对于 collect {} 的情况,emit 方法内部就是执行 collect 传入的 action 代码块。因为它在每次调用 collect 时才去触发发送数据的动作,所以说 Flow 是冷流

主要流程如下图:

flowOn

学习 flow 一个绕不开的操作符就是 flowOn 了,以下面示例代码为例, flow 需要在协程中使用,下面的 emit(1) 会在 Dispatchers.Default 指定的线程中执行,而 println(it) 会在父协程所在线程中执行:

flow { emit(1) }.flowOn(Dispatchers.Default).collect { println(it) }

flow {} 的源码在上面已经看过了,就是以 block 代码块为参数创建了一个 SafeFlow 对象,接下来看一下 Flow.flowOn 的逻辑:

public fun Flow.flowOn(context: CoroutineContext): Flow {
checkFlowContext(context)
return when {
// 返回自身 Flow 实例
// 这里我们传入了 Dispatchers.Default, 所以不符合这个条件
context == EmptyCoroutineContext -> this
// SafeFlow 不是该类型,因此也不走这个流程,实际上 FusibleFlow 是当连续多次调用 flowOn 后会创建的 Flow 对象
this is FusibleFlow -> fuse(context = context)
// 逻辑走到这里
else -> ChannelFlowOperatorImpl(this, context = context)
}
}

在上面已经对流程注释了一下,因此上述实例代码转换一下即为: SafeFlow.flowOn.collect {} --> ChannelFlowOperatorImpl.collect {}, 这里注意一下创建 ChannelFlowOperatorImpl 对象时传入的两个参数,第一个 this 指的是之前的 SafeFlow 对象,第二个 context 参数即是我们传入的调度器,它是一个协程上下文

ChannelFlowOperatorImpl.collect 实现在父类 ChannelFlowOperator.collect 中,该方法如果发现传入的 coroutineContext 上下文中没有携带调度器,即我们调用 flowOn 时没有传入 Dispatchers 等调度器,则会直接调用上一层 SafeFlow 的 collect 方法(代码不贴了),否则接着调用父类 ChannelFlow 中的 collect 方法,我们直接看 flowOn 中传入了调度器后的逻辑:

internal abstract class ChannelFlowOperator<S, T>(
@JvmField protected val flow: Flow,
context: CoroutineContext,
capacity: Int,
onBufferOverflow: BufferOverflow
) : ChannelFlow(context, capacity, onBufferOverflow) {
override suspend fun collect(collector: FlowCollector) {
// 判断 coroutineContext 逻辑
// …
super.collect(collector) // 调用父类 ChannelFlow 中方法
}
}

public abstract class ChannelFlow(
// upstream context
@JvmField public val context: CoroutineContext,
// buffer capacity between upstream and downstream context
@JvmField public val capacity: Int,
// buffer overflow strategy
@JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow {
override suspend fun collect(collector: FlowCollector): Unit =
coroutineScope {
collector.emitAll(produceImpl(this))
}

public open fun produceImpl(scope: CoroutineScope): ReceiveChannel =
scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
}

这里可以看到 ChannelFlowOperatorImpl.collect 最后会走到 collector.emitAll(produceImpl(this)) 生产消费的逻辑,我们分步骤看一下生产和接收的流程。

生产数据

首先看上面 produceImpl 方法:

internal fun CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope.() -> Unit
): ReceiveChannel {
val channel = Channel(capacity, onBufferOverflow)
val newContext = newCoroutineContext(context)
val coroutine = ProducerCoroutine(newContext, channel)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
coroutine.start(start, coroutine, block)
return coroutine
}

看到这个方法,是不是很熟悉呢?参考之前的 Kotlin之深入理解协程工作原理 的文章可以知道,这里的 produce 方法其实就是启动了一个新的协程,该协程执行的代码块 block 是传入的 collectToFun 参数,接着找 collectToFun 可以发现它会取 ChannelFlowOperator.collectTo 方法:

// ChannelFlowOperator
protected override suspend fun collectTo(scope: ProducerScope) =
// flowCollect 方法实现在子类 ChannelFlowOperatorImpl 中
flowCollect(SendingCollector(scope))

// ChannelFlowOperatorImpl
internal class ChannelFlowOperatorImpl(
flow: Flow,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
override suspend fun flowCollect(collector: FlowCollector) =
// 这个 flow 就是上层传入的 SafeFlow 对象
flow.collect(collector)
}

根据之前的解析, flow.collect(collector) 中的 flow 是 SafeFlow 对象,其 collect 方法会执行 SafeFlow 中传入的代码块(即flow {}),这个代码块中调用了 collector.emit(1) 方法(上面代码可以看出此时的 collector 是 SendingCollector 实例),因此我们看看 SendingCollector.emit 方法做了什么:

public class SendingCollector(
private val channel: SendChannel
) : FlowCollector {
override suspend fun emit(value: T): Unit = channel.send(value)
}

于是可以知道 produceImpl 方法就是启动了一个新的协程,然后在协程中执行上层 flow 对象(所以 flowOn 会对它上游的部分起作用)中的代码块(里面调用了 SendingCollector.emit 方法),然后通过 Channel.send 方法把这个 value 发送出去

接收数据

上面看了启动协程并在其内通过 Channel 发送数据的流程,这里看一下数据是怎么接收的,回到最开始的代码,从 collector.emitAll(channel) 开始,这个 channel 参数就是上一节上调用 send 发送数据的那个 channel 对象:

public suspend fun FlowCollector.emitAll(channel: ReceiveChannel): Unit =
emitAllImpl(channel, consume = true)

private suspend fun FlowCollector.emitAllImpl(channel: ReceiveChannel, consume: Boolean) {
ensureActive()
var cause: Throwable? = null
try {
while (true) {
val result = run { channel.receiveCatching() }
if (result.isClosed) {
result.exceptionOrNull()?.let { throw it }
break // returns normally when result.closeCause == null
}
emit(result.getOrThrow())
}
} catch (e: Throwable) {
cause = e
throw e
} finally {
if (consume) channel.cancelConsumed(cause)
}
}

这里可以看到开了一个无限循环,然后通过 Channel 去接收数据,并通过 emit 方法把接收的值发射出去,至于调用这个 emit 方法的 FlowCollector 对象是谁呢?再回到一开始 flow { emit(1) }.flowOn(Dispatchers.Default).collect { println(it) } 示例中最后面调用的 collect 方法,结合上一章 collect 的解析,可以知道这个 FlowCollector 就是通过 collect 方法传入的代码块创建的对象:

public suspend inline fun Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector {
override suspend fun emit(value: T) = action(value)
})

于是最终在收到数据并 emit 后,会把 value 传递给 collect {} 中的代码块去执行。

多个flowOn

以下面代码为例:

总结

【Android 详细知识点思维脑图(技能树)】

其实Android开发的知识点就那么多,面试问来问去还是那么点东西。所以面试没有其他的诀窍,只看你对这些知识点准备的充分程度。so,出去面试时先看看自己复习到了哪个阶段就好。

虽然 Android 没有前几年火热了,已经过去了会四大组件就能找到高薪职位的时代了。这只能说明 Android 中级以下的岗位饱和了,现在高级工程师还是比较缺少的,很多高级职位给的薪资真的特别高(钱多也不一定能找到合适的),所以努力让自己成为高级工程师才是最重要的。

由于篇幅有限,这里以图片的形式给大家展示一小部分。

详细整理在GitHub:Android架构视频+BAT面试专题PDF+学习笔记​

(https://github.com/a120464/Android-P7/blob/master/Android%E5%BC%80%E5%8F%91%E4%B8%8D%E4%BC%9A%E8%BF%99%E4%BA%9B%EF%BC%9F%E5%A6%82%E4%BD%95%E9%9D%A2%E8%AF%95%E6%8B%BF%E9%AB%98%E8%96%AA%EF%BC%81.md)**

网上学习 Android的资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。希望这份系统化的技术体系对大家有一个方向参考。

举报

相关推荐

0 条评论