NestJS-系列
NestJS[一]-基础
RxJS
NestJS[二]-核心
NestJS[三]-进阶
NestJS[四]-数据库

RxJS

RxJS(Reactive Extensions for JavaScript) 是一个使用可观察(Observable)序列来编写异步和基于事件(Event-based)程序的库。即观察者模式,编写异步队列和事件处理。cn文档v5

它提供了一种核心类型,即 Observable,附属类型(Observer,Scheduler,Subject)和受 Array(map、filter、reduce、every 等)启发的运算符(Operators),允许将异步事件作为集合进行处理

1
2
3
4
5
6
1. Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
2. Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
3. Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
4. Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操作符来处理集合。
5. Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
6. Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他。

Nest 内置了 RxJs,如拦截器(Interceptor)等功能都是基于 RxJs 实现的。

参考:
The introduction to Reactive Programming you’ve been missing
响应式编程(Reactive Programming)介绍
响应式编程入门指南 - 通俗易懂 RxJS
最简Rxjs入门教程—别再被Rxjs的概念淹没了
5 分钟理解什么是响应式编程 Reactive Programming
RxJS——给你如丝一般顺滑的编程体验
流动的数据——使用 RxJS 构造复杂单页应用的数据逻辑
响应式编程 | 响应式系统 | 响应式设计
RxJS从入门到精通

响应式宣言

在不同的领域深耕的各个组织都独立地发现了一种如出一辙的软件构建模式。这些系统更加的健壮、更加具有回弹性/韧性、更加灵活,也能更好地满足现代化的需求。

响应式宣言 阐述了响应式系统该有的特质以及实现手段,是一套贯通整个系统的架构设计方案。

响应式系统特质:

  1. 响应性(Responsive):系统应该尽可能地即时响应客户端的请求;
  2. 回弹性(Resilient):系统应该能在出现故障时保持响应能力。
  3. 弹性(Elastic):系统在各种负载下都可以响应。自动缩/扩容,能够做到负载均衡。
  4. 消息驱动(Message Driven):使用消息,让系统中具体的各功能组件具有松耦合、隔离、位置透明、边界清晰明确的特点。

在服务正常或者异常的情况下,都需要及时的对外部请求做出响应,响应可以是正常内容也可以是异常情况下的 fast throw,借助于消息或者事件驱动来达到系统内部异步非阻塞的交互机制,这通常需要系统服务各个分层各个组件都是响应式的。

Reactive Stream 响应式规范,是异步非阻塞且有背压的流处理标准,它定义了实现响应式编程时的 API。

ReactiveX响应式规范的一种实现,通过使用可观察序列,组成异步和基于事件的程序。核心是创建并订阅名为 Observable 的数据流。RX 提供了一个函数工具箱,结合观察者和迭代器模式以及函数式编程,可以组合、合并、过滤、转换和创建数据流。

响应式编程

响应式编程是一种用于开发响应式系统的手段,基于事件驱动。

同步/异步编程:

  1. 同步编程是一种请求响应模型,调用一个方法,等待其响应返回。
  2. 异步编程发出一个任务后,不等待结果,就继续发出下一个任务。获取结果:主动轮询Proactive、被动接收反馈Reactive

Reactive中,上一个任务执行结果的反馈就是一个事件,这个事件的到来会触发依赖链中下一个任务的执行。

依赖链:多个异步任务之间的依赖关系,例如一个任务的执行结果是另一个任务的输入。

响应式编程(Reactive Programming):基于Reactive,是一种面向数据流变化传播的编程范式。即使用异步数据流进行编程

点击页面中的一个按钮,这个点击事件(event)就是一个异步数据流。
在响应式编程中,任何东西都可以包装为流,如:变量、用户的操作、属性、缓存、数据结构等。
还有一系列的工具函数,以函数式编程的方式,组合、创建、过滤这些流。

流Stream

Stream是响应式编程的核心,本质是一个按时间排序的事件(Events)序列。

事件:一个任务的执行结果的反馈,可能是一个值,也可能是一个错误。

流的三种事件及其结果:

  1. Value 某种类型的值
  2. Error 错误
  3. complete 已完成信号(Completed Signal)

自然的,可以监听这三个事件,将事件的结果(值、错误或已完成信号)交给不同的函数进行处理。

监听一个 Stream 也被称作是订阅,定义的处理函数就是观察者,Stream则是被观察者。这也就是观察者模式。

Rxjs的流

在Rxjs中,通过 Observable 类,以迭代器模式创建一个流(也就是创建了流的数据或事件的集合),而 Observer 接口定义了观察者。一个流只有被订阅后才会启动,多次订阅会启动多个相互独立的流。即单播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import { Observable, Observer } from 'rxjs';
const observable = new Observable((subscriber) => {
// 发布
subscriber.next(1);
subscriber.next(2);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
subscriber.next(3);
});

// 观察者
const observer: Observer<number> = {
next: (value) => console.log('value: ' + value), // 值
error: (err) => console.error(err), // 错误
complete: () => console.log('done'), // 完成
};
// 订阅
const subscription = observable.subscribe(observer);
setTimeout(() => {
// 停止流
// subscription.unsubscribe();
}, 1000);
// value: 1
// value: 2
// value: 3
// value: 4
// done

Observable类接收一个函数,参数为Subscriber订阅者对象,它有next()error()complete()三个方法,可以触发流中的三个事件。实际是调用了Observer观察者中定义的三个对应的处理函数。

Observable的实例方法subscribe()接收Observer对象,返回Subscription对象。即使用某个观察者去订阅并启动该流。多次调用subscribe()会启动多个相互独立的流。

Subscription对象上有unsubscribe()方法,用于取消订阅并停止该流。

Observable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Observable<T> implements Subscribable<T> {

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic)

static create: (...args: any[]) => any = <T>(subscribe?: (subscriber: Subscriber<T>) => TeardownLogic) => {
return new Observable<T>(subscribe);
};

lift<R>(operator?: Operator<T, R>): Observable<R>

subscribe(observerOrNext?: Partial<Observer<T>> | ((value: T) => void)): Subscription;
subscribe(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): Subscription;

forEach(next: (value: T) => void): Promise<void>;
// .....
pipe(): Observable<T>;
// .....
toPromise(): Promise<T | undefined>;
// .....
}
Observer
1
2
3
4
5
interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}

Subject主体

不同于单播模式总是启动一个新流,多播模式无论什么时候订阅,都只会接收到流上实时的数据。

Subject是一种特殊的Observable,是可观察对象观察者混合体,由Subject订阅一个流,再让多个观察者订阅它。通过中间商实现了多播

本质上,Subject是一个事件发射器,当它接收到流的事件,会将该事件多播给所有订阅它的观察者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { Subject, interval } from 'rxjs';
const observable = interval(1000);
const subject = new Subject();
observable.subscribe(subject);
// 通过 subject 订阅同一个实时流
subject.subscribe((value) => console.log('A ' + value));
setTimeout(() => {
subject.subscribe((value) => console.log('B ' + value));
}, 1000);
// A 0
// A 1
// B 1 // B 比 A 晚了 1 秒开始订阅同一个实时流
// A 2
// B 2
// A 3
// B 3

Operators操作符

Observable实际上是创建了流的数据或事件的集合,而Operators可以对这个集合进行类似Array的操作。
即使用像 map、filter、scan、take 等纯函数的操作符,以函数式编程风格来处理集合。

操作符是 Observable 类型上的方法,分为实例操作符静态操作符

  1. 实例操作符:内部使用 this 关键字来指代输入的 Observable,处理后,返回一个新的 Observable。
  2. 静态操作符:内部不使用 this 关键字,而是完全依赖于传入的参数,接收非 Observable 或多个 Observable 参数,返回一个新的 Observable。如创建操作符和部分组合操作符。

注意:在Rxjs中,操作符通常是单独导入使用,而不是在类型上。对流进行处理的操作符,以管道pipe()的方式串联使用。就像是Node中对流的管道处理一样。

当操作符被调用时,它们不会改变已经存在的 Observable 实例。相反,它们返回一个新的 Observable,它的 subscription 逻辑基于第一个 Observable。

操作符有以下类型:操作符分类

  1. 创建操作符:创建一个新的流,如offromintervaltimer等。
  2. 转换操作符:对流中的数据进行转换,如mapmergeMapconcatMap等。
  3. 过滤操作符:过滤掉流中的数据,如filtertakeskip等。
  4. 组合操作符:将多个流合并成一个流,如mergeconcatcombineLatest等。
  5. 多播操作符:将单播流转为多播流,如sharepublishmulticast等。
  6. 错误处理操作符:处理流中的错误,如catchErrorretry等。
  7. 工具操作符:进行一些辅助操作,如dodelaytimeout等。
  8. 条件和布尔操作符:对流中的数据进行条件判断,如everyfindisEmpty等。
  9. 数学和聚合操作符:对流中的数据进行数学运算,如countmaxminreduce等。

操作符有很多,官网提供了选择操作符的快捷问答

创建操作符

创建操作符属于静态操作符,使用非 Observable 参数创建一个新的流。文档

create

create 将 subscribe 函数转化为一个实际的 Observable。已经废弃,推荐直接使用new Observable()

1
2
3
create<T>(subscribe?: (subscriber: Subscriber<T>) => TeardownLogic) => {
return new Observable<T>(subscribe);
};

from

from 从一个数组、类数组对象、Promise、迭代器对象或者类 Observable 对象创建一个 Observable.

1
from<T>(input: any, scheduler?: SchedulerLike): Observable<T>
1
2
3
const observable = from([10, 20, 30]);
observable.subscribe((v) => console.log(v));
// 10 20 30

of

of 将若干参数转化成一个 Observable 对象。

1
of<T>(...args: (SchedulerLike | T)[]): Observable<T>
1
2
3
const observable = of(10, 20, 30);
observable.subscribe((v) => console.log(v));
// 10 20 30

interval

基于给定时间间隔投射数字序列

1
interval(period: number = 0, scheduler: SchedulerLike = async): Observable<number>
1
2
3
const observable = interval(1000);
observable.subscribe((v) => console.log(v));
// 0 1 2 3 ... 每隔 1 秒输出一个数字

timer

给定持续时间后,再按照指定间隔时间依次投射数字。

1
2
3
4
5
timer(
dueTime: number | Date = 0,
periodOrScheduler?: number | SchedulerLike,
scheduler?: SchedulerLike
): Observable<number>
1
2
3
const observable = timer(3000, 1000);
observable.subscribe((v) => console.log(v));
// 3 秒后输出 0,之后每隔 1 秒输出数字序列 1 2 3 ...

range

创建一个发射给定范围内的数字序列的 Observable。

1
range(start: number = 0, count: number = 0, scheduler: SchedulerLike = async): Observable<number>
1
2
3
const observable = range(1, 10);
observable.subscribe((v) => console.log(v));
// 1 2 3 4 5 6 7 8 9

empty

创建一个不发射任何数据的 Observable,只发射 complete 通知。

1
empty<T>(scheduler: SchedulerLike = async): Observable<T>
1
2
3
4
5
6
const observable = empty();
observable.subscribe({
next: (v) => console.log(v),
complete: () => console.log('complete'),
});
// complete

repeat

将数据源重复 count 次。

1
repeat<T>(countOrConfig?: number | RepeatConfig): MonoTypeOperatorFunction<T>
1
2
3
const observable = of(1, 2, 3).pipe(repeat(2));
observable.subscribe((v) => console.log(v));
// 1 2 3 1 2 3

转换操作符

对流中的数据进行转换,以得到期望的数据。文档

buffer

将一个流产生的数据缓冲到数组中,直到另一个 Observable 发射了数据,然后发射这个数组。

1
buffer<T>(closingNotifier: ObservableInput<any>): OperatorFunction<T, T[]>

通常与intervaltimer或dom事件等结合使用,每隔一段时间缓冲一次数据。

1
2
3
4
const clicks = fromEvent(document, 'click');
const interval = interval(1000);
const buffered = interval.pipe(buffer(clicks));
buffered.subscribe((x) => console.log(x));
1
2
3
const observable = interval(1000).pipe(buffer(timer(4000, 3000)));
observable.subscribe((v) => console.log(v));
// [0, 1, 2] [3, 4, 5] [6, 7, 8] ...

map

对流中的每个数据应用一个函数,然后发射结果。与数组的map类似。

1
map<T, R>(project: (value: T, index: number) => R): OperatorFunction<T, R>
1
2
3
const observable = of(1, 2, 3).pipe(map((x) => x * x));
observable.subscribe((v) => console.log(v));
// 1 4 9

mergeMap

将每个源数据值投射成 Observable ,该 Observable 会合并到输出 Observable 中。

1
2
3
4
5
mergeMap<T, R, O extends ObservableInput<any>>(
project: (value: T, index: number) => O,
resultSelector?: number | ((outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R),
concurrent: number = Number.POSITIVE_INFINITY
): OperatorFunction<T, ObservedValueOf<O> | R>
1
2
3
4
5
6
7
const observable = of('a', 'b', 'c').pipe(
mergeMap((x) => interval(1000).pipe(map((i) => x + i))),
);
observable.subscribe((x) => console.log(x));
// 每隔 1 秒输出
// a0 b0 c0
// a1 b1 c1

mapTo

将每个数据映射为同一个值。已弃用,推荐使用map

1
mapTo<T, R>(value: R): OperatorFunction<T, R>
1
2
3
const observable = of(1, 2, 3).pipe(mapTo('a'));
observable.subscribe((v) => console.log(v));
// a a a

pluck

从每个发射的对象中选择一个属性。已弃用,推荐使用map

1
pluck<T, R>(...properties: string[]): OperatorFunction<T, R>
1
2
3
const observable = of({ name: 'a' }, { name: 'b' }).pipe(pluck('name'));
observable.subscribe((v) => console.log(v));
// a b

scan

对流中的数据进行累加,然后发射结果。类似数组的reduce

1
scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed?: R): OperatorFunction<T, R>
1
2
3
const observable = of(1, 2, 3).pipe(scan((acc, value) => acc + value, 0));
observable.subscribe((v) => console.log(v));
// 1 3 6 每次数据为前面所有数据的累加

reduce

将源 Observalbe 序列的值归并为单个值。等同于 scan 和 last 一同使用。

1
reduce<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed?: R): OperatorFunction<T, R>
1
2
3
const observable = of(1, 2, 3).pipe(reduce((acc, value) => acc + value, 0));
observable.subscribe((v) => console.log(v));
// 6

过滤操作符

过滤流中的数据,以得到期望的数据。文档

take

只发射前 n 个数据。

1
take<T>(count: number): MonoTypeOperatorFunction<T>
1
2
3
const observable = interval(1000).pipe(take(3));
observable.subscribe((v) => console.log(v));
// 0 1 2

skip

跳过前 n 个数据,然后发射剩下的数据。

1
skip<T>(count: number): MonoTypeOperatorFunction<T>
1
2
3
const observable = interval(1000).pipe(skip(3));
observable.subscribe((v) => console.log(v));
// 3 4 5 ...

first

只发射第一个数据,或者满足条件的第一个数据。

1
2
3
4
first<T, D = T>(
predicate: (value: T, index: number, source: Observable<T>) => boolean,
defaultValue?: D
): OperatorFunction<T, T | D>;
1
2
3
const observable = of(1, 2, 3, 4).pipe(first((value) => !(value % 2)));
observable.subscribe((v) => console.log(v));
// 找到第一个能被 2 整除的数

debounceTime

只有在过了指定的时间间隔后,才发射最新的数据。类似于防抖。

如果在延时时间内数据源又发送了一个新数据,这个新的数据就会被先缓存住不会发送,等待发送完数据之后并等待延时时间结束才会发送给订阅者

1
debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyncScheduler): MonoTypeOperatorFunction<T>
1
2
3
const observable = interval(1000).pipe(take(3), debounceTime(2000));
observable.subscribe((x) => console.log(x));
// 2

throttleTime

在指定的时间间隔内,只发射第一个数据。类似于节流。

唯一和防抖操作符不一致的地方就在于它对于第一个值是不会阻塞的。

1
throttleTime<T>(duration: number, scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T>
1
2
3
const observable = interval(1000).pipe(throttleTime(2000));
observable.subscribe((x) => console.log(x));
// 0 2 4 ...

filter

只发射满足条件的数据。

1
filter<T>(predicate: (value: T, index: number) => boolean, thisArg?: any): OperatorFunction<T, T>
1
2
3
const observable = of(1, 2, 3, 4).pipe(filter((value) => !(value % 2)));
observable.subscribe((v) => console.log(v));
// 2 4 只发射能被 2 整除的数

distinct

过滤重复数据。

1
distinct<T, K>(keySelector?: (value: T) => K, flushes?: ObservableInput<any>): MonoTypeOperatorFunction<T>
1
2
3
const observable = of(1, 2, 2, 3, 3, 4).pipe(distinct());
observable.subscribe((v) => console.log(v));
// 1 2 3 4

组合操作符

将多个流合并成一个流。文档

concat

按顺序发射多个 Observable 的值。起到连接多个流的作用。

1
concat<T, R>(...observables: (ObservableInput<T> | SchedulerLike)[]): Observable<T>
1
2
3
const observable = concat(interval(1000).pipe(take(3)), range(1, 3));
observable.subscribe((v) => console.log(v));
// 0 1 2 1 2 3

merge

将多个 Observable 合并成一个 Observable,按照它们开始的时间顺序发射数据。

1
merge<T, R>(...observables: (ObservableInput<T> | SchedulerLike)[]): Observable<T>
1
2
3
const observable = merge(interval(1000).pipe(take(3)), range(1, 3));
observable.subscribe((v) => console.log(v));
// 1 2 3 0 1 2

concatAll

将高阶 Observable 转化为一阶 Observable,串行发射。

底层使用 mergeAll。将其并行度设置为 1,即可实现串行发射。

1
2
3
function concatAll<O extends ObservableInput<any>>(): OperatorFunction<O, ObservedValueOf<O>> {
return mergeAll(1);
}
1
2
3
const observable = of(of(1, 2, 3), of(4, 5, 6)).pipe(concatAll());
observable.subscribe((v) => console.log(v));
// 1 2 3 4 5 6

mergeAll

将高阶 Observable 转化为一阶 Observable,并行发射。底层使用 mergeMap。

传入一个可选参数,表示并行度。默认为 Infinity。当并行度为 1 时,等同于 concatAll。

1
2
3
function mergeAll<O extends ObservableInput<any>>(concurrent: number = Infinity): OperatorFunction<O, ObservedValueOf<O>> {
return mergeMap(identity, concurrent);
}
1
2
3
4
5
6
7
8
const observable = of(0, 1, 2).pipe(
map((x) => interval(1000)),
mergeAll(),
);
observable.subscribe((v) => console.log(v));
// 0 0 0
// 1 1 1
// 2 2 2

工具操作符

进行一些辅助操作。文档

tap

在数据流中的每个数据上执行副作用,但不会改变数据流。

1
2
3
4
5
function tap<T>(
next?: ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): MonoTypeOperatorFunction<T>;
1
2
3
const observable = of(1, 2, 3).pipe(tap((v) => console.log(v)));
observable.subscribe();
// 1 2 3

timeout

如果在指定时间内流没有发射数据,就发射一个错误。

1
function timeout<T>(each: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
1
2
3
4
5
6
const observable = interval(2000).pipe(timeout(1000));
observable.subscribe({
next: (v) => console.log(v),
error: (err) => console.log(err),
});
// TimeoutError: Timeout has occurred

delay

延迟发射数据。

1
function delay<T>(due: number | Date, scheduler: SchedulerLike = asyncScheduler): MonoTypeOperatorFunction<T>
1
2
3
const observable = of(1, 2, 3).pipe(delay(1000));
observable.subscribe((v) => console.log(v));
// 延迟 1 秒后输出 1 2 3

错误处理操作符

处理流中的错误。文档

catchError

捕获错误,然后用另一个 Observable 替换它,或者继续抛出错误。

1
2
3
function catchError<T, O extends ObservableInput<any>>(
selector: (err: any, caught: Observable<T>) => O
): OperatorFunction<T, T | ObservedValueOf<O>>;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const observable = of(1, 2, 3).pipe(
map((v) => {
if (v === 3) {
throw new Error('error');
}
return v;
}),
catchError((err) => of('a', 'b', 'c')),
);
observable.subscribe({
next: (v) => console.log(v),
error: (err) => console.log(err),
});
// 1 2 a b c
// 从 1 2 到 3 时抛出错误,捕获错误后新的流输出 a b c

retry

如果发生错误,就重试。

1
2
function retry<T>(count?: number): MonoTypeOperatorFunction<T>;
function retry<T>(config: RetryConfig): MonoTypeOperatorFunction<T>;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const observable = of(1, 2, 3).pipe(
map((v) => {
if (v === 3) {
throw new Error('error');
}
return v;
}),
retry(1),
);
observable.subscribe({
next: (v) => console.log(v),
error: (err) => console.log(err),
});
// 1 2
// 1 2 这次为重试,仍然出错
// Error: error

更多操作符就多看文档吧,还有很多很多~。