0
点赞
收藏
分享

微信扫一扫

RxJS 如何助力业务开发?

最后的执着 2022-05-03 阅读 46

pipe 的原理是链式(reduce)调用 innerSubscribe 函数,和 subscribe 的区别在于,不会触发副作用的执行。

对于输入流的每次 next 调用,pipe 中逻辑的默认执行次数为订阅方的数量。在绝大多数情况下,不希望逻辑执行多遍,于是需要使用多播。很多文档把多播说的非常复杂,个人觉得没有必要,懂这两个 operator 就行:shareshareReplay。把这两个 operator 放在 pipe 的最后一个位置,即可保证前面的逻辑执行次数与订阅方数量无关。不同的是,shareReplay 不管有没有订阅方,都会执行逻辑,而 share 必须存在第一个订阅方之后才开始执行逻辑。

使用 shareReplay 的又叫做热流,其他叫做冷流。在有订阅时,冷流开始执行逻辑,在无订阅时,冷流终止逻辑,冷流具有 rxjs 的核心优势:副作用管理。热流与是否订阅无关,在没有订阅时,也不会终止逻辑。因此,个人很少把 shareReplay 与异步结合使用。

参考资料:可以深入了解[1]

引用链接

[1] 可以深入了解: https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html#hot-vs-cold-observables

综上,使用 rxjs 包装逻辑的基本结构如下:

const input$ = new Subject();

const output$ = new BehaviorSubject(initValue);

const subscription = input$.pipe(

operator1(),

operator2(),

).subscribe(output => output$.next(output));

•约定:在 UI 层消费数据时,input$ 结尾的流仅可使用 next;output$ 结尾的流仅可使用 subscribe•一般消费 output$ 使用 hook:const output = useSubscribable(output$)•记得把 subscription 传出去,在组件卸载时 unscribable

有同学问了,为什么不像这么写:

const input$ = new Subject();

const output$ = input$.pipe(

operator1(),

operator2(),

);

•第二种写法需要考虑多播。当冷流、热流混搭,最后到底什么执行了,什么没执行;或者什么副作用能被清理,什么无法清理,就乱掉了。而第一种写法根本不用考虑复杂的多播。You may not need broadcast.md。•在第二种写法中,output$ 的类型是 Observable。•当组件在数据发出后订阅的话,会丢失当前的数据。•没有初始值,调用时需要 useSubscribable(output$, initValue),初始值散落在各个 UI 组件里,降低了可维护性。startsWith 可以部分解决这个问题。•没有 next 方法,当需求变得复杂时,通常需要对 output$ 进行处理,这时候又会改造成第一种写法。

一句话:第一种写法就不用考虑乱七八糟的概念问题

值得注意的是,一个流 error 或者 complete 后,无论如何订阅方都不会再收到新值了,并且不再执行任何逻辑。

input$.pipe(tap(() => throw new Error())).subscribe(x => {

// never reach here

})

所以需要自己注意错误处理,在我们项目中,封装了一些自定义 operator 将异步返回值包装为:

loading: boolean

error?: Error

data?: T

input?: any

从而防止因为出现错误,而导致流异常终止,无法继续执行的问题。

Rxjs 中有个特殊的 ObservableNEVER。它从不发出任何值,常用于提前终止逻辑,类似于普通函数中的 return

input$.pipe(

switchMap(x => {

if(!x) {

return NEVER

}

return x

}),

operator1()

).subscribe(x => {

// x will never be falsy

})

Operator

按好用度排序:

•switchMap: 超好用的 op。在 subscribe 下个流前,自动 unscbscribe 上个流,从而完美解决竞速问题。和异步有关的用它就对了。•debounce:一般个人喜欢与 pair 之类的 op 结合使用,通过判断哪个值改变从而决定 debounce 多少时间。•distinctUntilChanged:浅比较,如果两个值相等则不发出新值。一般传入深比较函数来使用。•withLatestFrom:获得目标流的最新值而不触发副作用。和 combineLatest 一样的是,如果某个流没有值,则会卡住,这个挺坑的。不同的是,combineLatest 参数中的流发出新值不会执行 pipe 中的逻辑。•exhaustMap。在当前流 complete 前,忽视其他流。一般用于和下载功能结合。值得注意的是,需要记得把上一个流 complete,否则下一次逻辑永远不会执行。编写自定义 operator 时,一定要思考如何取消副作用。个人建议组合已有 operator 来编写。

赋能业务 - 以 form-result 为例


接下来看看 rxjs 是如何帮助业务快速迭代的。在我们的业务中,最常见的是查询分析页,由查询条件与结果展示两块组成,这样的结构我称之为 form-result。

但是不管是怎样的界面,都可以划分为如下几层:

一个业务组件通常结构如下:

const { input , o u t p u t , output ,output } = useXxx(); // 从 context 中取数据

const handleClick = useCallback(() => input . n e x t ( s o m e V a l u e ) ) ; / / 使 用 i n p u t .next(someValue)); // 使用 input .next(someValue));//使input

const output = useSubscrible(output ) ; / / 使 用 o u t p u t ); // 使用output );//使output,大部分情况下为 BehaviorSubject,此时不需要初始值

return

{output}

// 渲染界面

所有业务组件最外层是一个 Provider

const value = useXxxProvider();

return <XxxContext.Provider value={value}>{children}</XxxContext.Provider>

几乎所有的业务逻辑都在 useXxxProvider 里,useXxxProvider 会调用 createXxx 去生成各个子业务的逻辑,最后把所有逻辑进行 merge 并返回。

function useXxxProvider() {

const { observables, subscriptions } = useMemo(() => {

// 创建 form 有关的流

const {

observables: formObservables,

subscriptions: formSubscriptions

} = createForm();

// 创建 result 有关的流

const {

observables: resultObservables,

subscriptions: resultSubscriptions

} = createResult(formObservables.formOutput$);

// 把流和订阅返回

return {

observables: {

…formObservables,

…resultObservables

},

subscriptions: […formSubscriptions, …resultSubscriptions]

}

}, [])

// 一般没有任何依赖项,如有,需保证不变,或以 ref 形式传入

// 在组件卸载时销毁订阅

useEffect(() => {

return () => subscriptions.forEach(sub => sub.unsubscribe())

}, []);

// 返回生成的流

return observables;

}

其中,每一小块业务逻辑都被包装在 createXxx 里,其基本结构如下:

1. 创建有关的流,一般分为 input$ 和 output$2. 使用 pipe 连接两条流,包装主要业务逻辑3. 把流与订阅返回

function createXxx() {

// 创建有关的 Subject

const someSubject$ = new Subject();

// 连接流与业务逻辑

const subscription = someSubject$.pipe(…).subscribe()

// 返回流与订阅

return {

observables: { someSubject$ }

subscriptions: [subscription]

}

}

具体到我们的业务上,其中表单逻辑的代码如下:

1. 输入流的类型为 Partial<T>,因为在表单 onChange 的时候,通常不会关心所有表单的值,大部分情况下只要把自己的值调用 formInput$.next() 就行了2. 把输入流中的值与原值进行 merge,并根据实际业务添加参数校验与表单联动的逻辑后,就形成了输出流

function createForm() {

const formInput$ = new Subject<Pratial>();

const formOutput$ = new BehaviorSubject(INIT_FORM);

const subscription = formInput$.pipe(

withLatestFrom(formOutput$),

map(([input, output]) => {

// 如有其他的参数校验、表单联动等逻辑,可以加在这里

const extra = {}

if(‘someThing’ in input) {

extra.otherThing = someValue

}

return {…output, …input, …extra}

})

).subscribe(output => formOutput$.next(output));

return {

observables: {

formInput$,

formOutput$

},

subscriptions: [subscription]

}

}

结果业务的代码如下:

1. 根据实际业务场景生成请求流,如遇请求参数与 UI state 《大厂前端面试题解析+Web核心总结学习笔记+企业项目实战源码+最新高清讲解视频》无偿开源 徽信搜索公众号【编程进阶路】 差别很大的,可以在这里使用 map 进行转换2. 当产生请求流时,发送请求,生成结果流3. 把结果流和有关订阅返回

function createResult(formOutput$){

// 情况一:无查询按钮,修改表单则自动发请求

const request$ = formOutput$.pipe(

pair(),// 自定义op,第一次返回[undefined, T],其他和pairwise一致

debounce(([prev])=> timer(prev?0:800)),// 首次立刻发请求

distinctUntilChanged(equals) // 根据业务需要,可以增加这个,注意的是不传入深比较函数则为浅比较

);

// 情况二:点击查询按钮再发请求

const requestButtonInput$ = new Subject();

const request$ = requestButtonInput$.pipe(

withLatestFrom(formOutput$),

map(([,form]) => form)

);

// 情况三:两者兼具

const requestButtonInput$ = new Subject();

const request$ = merge(formOutput$.pipe(

pair(),// 自定义op,第一次返回[undefined, T],其他和pairwise一致

debounce(([prev])=> timer(prev?0:800))// 首次立刻发请求

), requestButtonInput$.pipe(

withLatestForm(formOutput$),

map(([, form]) => form)

));

// 准备完毕请求流,开始发请求

const resultOutput$ = new BehaviorSubject(INIT_RESULT)

const subscription = request$.pipe(

switchMap(request => getResult(request))

).subscribe(output => resultOutput$.next(output));

return {

observables: {

requestButtonInput$,

resultOutput$

},

subscriptions: [subscription]

}

}

useSubscribable


附上 useSubscribable代码:

import useRefWrapper from ‘hooks/useRefWrapper’

import { useState, useEffect } from ‘react’

import { Subscribable, BehaviorSubject } from ‘rxjs’

function getInitValue<T, R>(

subscribable: Subscribable,

selector: (value: T) => R,

initValue?: R

) {

if (initValue !== undefined) {

return initValue

}

if (subscribable instanceof BehaviorSubject) {

return selector((subscribable as any)._value)

}

return undefined

}

export default function useSubscribable(subscribable: BehaviorSubject): T

export default function useSubscribable<T, R>(

subscribable: BehaviorSubject,

selector: (value: T) => R

): R

export default function useSubscribable(

subscribable: Subscribable,

initValue: T

举报

相关推荐

0 条评论