RxJS快速入门

  • A+
所属分类:Web前端
摘要

RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。


内容导航

RxJS是什么

RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。

可以把 RxJS 当做是用来处理事件的 Lodash

ReactiveX 结合了 观察者模式迭代器模式使用集合的函数式编程,以满足以一种理想方式来管理事件序列所需要的一切。

RxJS的主要成员

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 mapfilterconcatflatMap 等这样的操作符来处理集合。
  • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeoutrequestAnimationFrame 或其他。

Observable (可观察对象)

RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables 与 Observer。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。Observables 是多个值的惰性推送集合。

  • of():用于创建简单的Observable,该Observable只发出给定的参数,在发送完这些参数后发出完成通知.
  • from():从一个数组、类数组对象、promise、迭代器对象或者类Observable对象创建一个Observable.
  • fromEvent(),:把event转换成Observable.
  • range():在指定起始值返回指定数量数字.
  • interval():基于给定时间间隔发出数字序列。返回一个发出无限自增的序列整数,可以选择固定的时间间隔进行发送。
  • timer():创建一个Observable,该Observable在初始延时之后开始发送并且在每个时间周期后发出自增的数字

创建 Observable

import { Observable,of,from,fromEvent,range,interval } from 'rxjs'; import { map } from 'rxjs/operators';  	const Observable1 = new Observable(subscriber => {     try{           subscriber.next(1);           subscriber.next(2);           subscriber.next(3);           setTimeout(() => {             subscriber.next(4);             subscriber.complete();           }, 1000);         } catch (err) {         	subscriber.error(err);	//传递一个错误对象,如果捕捉到异常的话。     	}     });     const Observable2 = from([       { name: 'Dave', age: 34, salary: 2000 },       { name: 'Nick', age: 37, salary: 32000 },       { name: 'Howie', age: 40, salary: 26000 },       { name: 'Brian', age: 40, salary: 30000 },       { name: 'Kevin', age: 47, salary: 24000 },     ]); 	const Observable3 = of("Dave","Nick");//把所有参数组合到数组,逐个提供给消费者 	const Observable4 = range(1,10); 	const Observable5 = interval(3000);//从零开始每3000毫秒自增并提供给消费者 	const Observable6 = timer(3000,1000);//等待3000毫秒后,从零开始每1000毫秒自增并提供给消费者 

订阅 Observables

因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。

当调用了 observable.subscribe ,观察者会被附加到新创建的 Observable 执行中。这个调用还返回一个对象,即 Subscription (订阅):

import { Observable,of,from,fromEvent,range,interval } from 'rxjs'; 	const observable1 = range(1,10);     observable1.subscribe(           num => {             console.log(num);           },           err => console.log(err),           () => console.log("Streaming is over.")         ); 

执行 Observables

Observable 执行可以传递三种类型的值:

  • "Next" 通知: 发送一个值,比如数字、字符串、对象,等等。
  • "Error" 通知: 发送一个 JavaScript 错误 或 异常。
  • "Complete" 通知: 不再发送任何值。

"Next" 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。"Error" 和 "Complete" 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。

import { Observable,of,from,fromEvent,range,interval } from 'rxjs'; import { map } from 'rxjs/operators';  	const observable = new Observable(subscriber => {     try{           subscriber.next(1);           subscriber.next(2);           subscriber.next(3);           subscriber.complete();           subscriber.next(4); // 因为违反规约,所以不会发送         } catch (err) {         	subscriber.error(err);	//传递一个错误对象,如果捕捉到异常的话。     	}     }); 

清理 Observable 执行

因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源

当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用 unsubscribe() 方法就可以取消执行。

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';     const observable = new Observable(subscriber => {       let intervalID = setInterval(() => {         subscriber.next('hi');       }, 1000);       // 提供取消和清理 interval 资源的方法       return function unsubscribe() {         clearInterval(intervalID);       };     });     let subscription = observable.subscribe(x => console.log(x));     subscription.unsubscribe(); 

Observer (观察者)

观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:nexterrorcomplete 。下面的示例是一个典型的观察者对象:

观察者只是有三个回调函数的对象,每个回调函数对应一种 Observable 发送的通知类型。

observable.subscribe(     next: x => console.log('Observer got a next value: ' + x),     error: err => console.error('Observer got an error: ' + err),     complete: () => console.log('Observer got a complete notification') ); 

Subscription (订阅)

Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe,它不需要任何参数,只是用来清理由 Subscription 占用的资源。在上一个版本的 RxJS 中,Subscription 叫做 "Disposable" (可清理对象)。

Subscription 基本上只有一个 unsubscribe() 函数,这个函数用来释放资源或去取消 Observable 执行。

import { Observable,of,from,fromEvent,range,interval } from 'rxjs'; 	var observable1 = interval(1000);     var subscription1 = observable1.subscribe(x => console.log(x));     // 稍后:     // 这会取消正在进行中的 Observable 执行     // Observable 执行是通过使用观察者调用 subscribe 方法启动的     subscription1.unsubscribe();      var observable2 = interval(400);     var observable3 = interval(300);     var subscription2 = observable2.subscribe(x => console.log('first: ' + x));     var childSubscription = observable3.subscribe(x => console.log('second: ' + x));     subscription2.add(childSubscription);     setTimeout(() => {       // subscription 和 childSubscription 都会取消订阅       subscription2.unsubscribe();     }, 1000); 

Subject (主体)

RxJS Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。

Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。

还有一些特殊类型的 Subject:BehaviorSubjectReplaySubjectAsyncSubject

每个 Subject 都是 Observable 。 - 对于 Subject,你可以提供一个观察者并使用 subscribe 方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 Observable 执行是来自普通的 Observable 还是 Subject 。

在 Subject 的内部,subscribe 不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener 的工作方式。

每个 Subject 都是观察者。 - Subject 是一个有如下方法的对象: next(v)error(e)complete() 。要给 Subject 提供新值,只要调用 next(theValue),它会将值多播给已注册监听该 Subject 的观察者们。

import { Subject,from } from 'rxjs'; 	//我们为 Subject 添加了两个观察者,然后给 Subject 提供一些值  	var subject1 = new Subject();     subject1.subscribe({       next: (v) => console.log('observerA: ' + v)     });     subject1.subscribe({       next: (v) => console.log('observerB: ' + v)     });     subject1.next(1);     subject1.next(2);     //因为 Subject 是观察者,这也就在意味着你可以把 Subject 作为参数传给任何 Observable 的 subscribe 方法     var subject2 =new Subject();     subject2.subscribe({       next: (v) => console.log('observerA: ' + v)     });     subject2.subscribe({       next: (v) => console.log('observerB: ' + v)     });     var observable = from([1, 2, 3]);     observable.subscribe(subject2); // 你可以提供一个 Subject 进行订阅 

多播的 Observables

“多播 Observable” 通过 Subject 来发送通知,这个 Subject 可能有多个订阅者,然而普通的 “单播 Observable” 只发送通知给单个观察者。

多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。

在底层,这就是 multicast 操作符的工作原理:观察者订阅一个基础的 Subject,然后 Subject 订阅源 Observable 。

import { Subject } from 'rxjs/internal/Subject'; import { take, multicast } from 'rxjs/operators'; 	     const source = timer(1000, 2500).pipe(take(5));     const subject = new Subject();     subject.subscribe({       next: (v) => console.log('observerC: ' + v)     });     subject.subscribe({       next: (v) => console.log('observerD: ' + v)     });     const multicasted = source.pipe(multicast(subject));     multicasted.subscribe({       next: (v) => console.log('observerA: ' + v)     });     multicasted.subscribe({       next: (v) => console.log('observerB: ' + v)     }); 	source.subscribe(subject); 

BehaviorSubject

Subject 的其中一个变体就是 BehaviorSubject,它有一个“当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”。

BehaviorSubjects 适合用来表示“随时间推移的值”。举例来说,生日的流是一个 Subject,但年龄的流应该是一个 BehaviorSubject 。

import { BehaviorSubject } from 'rxjs'; 	//BehaviorSubject 使用值0进行初始化,当第一个观察者订阅时会得到0。第二个观察者订阅时会得到值2,尽管它是在值2发送之后订阅的。 	const subject = new BehaviorSubject(0); // 0是初始值     subject.subscribe({       next: (v) => console.log('observerA: ' + v)     });      subject.next(1);     subject.next(2);      subject.subscribe({       next: (v) => console.log('observerB: ' + v)     });      subject.next(3); 

ReplaySubject

ReplaySubject 类似于 BehaviorSubject,它可以发送旧值给新的订阅者,但它还可以记录 Observable 执行的一部分。

ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者。

除了缓冲数量,你还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录。

import { ReplaySubject } from 'rxjs'; 	const subject = new ReplaySubject(3); // 为新的订阅者缓冲最后3个值     subject.subscribe({       next: (v) => console.log('observerA: ' + v)     });     subject.next(1);     subject.next(2);     subject.next(3);     subject.next(4);     subject.subscribe({       next: (v) => console.log('observerB: ' + v)     });     subject.next(5);  	//我们缓存数量100,但 window time 参数只设置了120毫秒     const subject = new ReplaySubject(100, 120 /* windowTime */);      subject.subscribe({       next: (v) => console.log('observerA: ' + v)     });     let i = 1;     setInterval(() => subject.next(i++), 200);     setTimeout(() => {       subject.subscribe({         next: (v) => console.log('observerB: ' + v)       });     }, 1000);	 

AsyncSubject

AsyncSubject 是另一个 Subject 变体,只有当 Observable 执行完成时(执行 complete()),它才会将执行的最后一个值发送给观察者。

AsyncSubject 和 last() 操作符类似,因为它也是等待 complete 通知,以发送一个单个值。

import { AsyncSubject } from 'rxjs';     const subject = new AsyncSubject();      subject.subscribe({       next: (v) => console.log('observerA: ' + v)     });      subject.next(1);     subject.next(2);     subject.next(3);     subject.next(4);      subject.subscribe({       next: (v) => console.log('observerB: ' + v)     });      subject.next(5);     subject.complete(); 

Scheduler (调度器)

调度器控制着何时启动 subscription 和何时发送通知。它由三部分组成:

  • 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
  • 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。
  • 调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法 now() 提供了“时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。

调度器可以让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者。

import { asyncScheduler, Observable } from 'rxjs'; 	//我们使用普通的 Observable ,它同步地发出值`1`、`2`、`3`,并使用操作符 `observeOn` 来指定 `async` 调度器发送这些值。 	const observable = new Observable(subscriber => {       subscriber.next(1);       subscriber.next(2);       subscriber.next(3);       subscriber.complete();     })       .pipe(         observeOn(asyncScheduler)       );      console.log('just before subscribe');     observable.subscribe({       next: x => console.log('got value ' + x),       error: err => console.error('something wrong occurred: ' + err),       complete: () => console.log('done'),     });     console.log('just after subscribe');     //你会发现"just after subscribe"在"got value..."之前就出现了     //just before subscribe     //just after subscribe     //got value 1     //got value 2     //got value 3     //done      

调度器类型

async 调度器是 RxJS 提供的内置调度器中的一个。可以通过使用 Scheduler 对象的静态属性创建并返回其中的每种类型的调度器。

调度器 目的
null 不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。
queueScheduler 当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。
asapScheduler 微任务的队列调度,它使用可用的最快速的传输机制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用于异步转换。
asyncScheduler 使用 setInterval 的调度。用于基于时间的操作符。
animationFrameScheduler 计划将在下一次浏览器内容重新绘制之前发生的任务。 可用于创建流畅的浏览器动画。

Pipeable(操作符)

操作符就是函数,管道操作符本质上是一个纯函数,它将一个Observable作为输入并生成另一个Observable作为输出。订阅输出Observable也将订阅输入Observable。 操作符有两种:

管道操作符是一个将Observable作为其输入并返回另一个Observable的函数。这是一个纯粹的操作:以前的Observable保持不变。

  1. 管道操作符是可以使用语法observableInstance.pipe(operator())传递给Observable的类型。 这些包括filter()mergeMap()。 调用时,它们不会更改现有的Observable实例。 相反,它们返回一个新的Observable,其订阅逻辑基于第一个Observable。

  2. 创建运算符是另一种运算符,可以称为独立函数来创建新的Observable。例如:of(1,2,3)创建一个observable ,该对象将依次发射1、2和3。创建运算符将在后面的部分中详细讨论。

obs.pipe(   op1(),   op2(),   op3(),   op3(), ) 

常用的操作符

finalize<T>(callback: () => void): MonoTypeOperatorFunction<T>:

返回原始Observable,但在Observable完成或发生错误终止时将调用指定的函数。

创建操作符

连接创建操作符

These are Observable creation operators that also have join functionality -- emitting values of multiple source Observables.

转换操作符

过滤操作符

组合操作符

Also see the Join Creation Operators section above.

多播操作符

错误处理操作符

工具操作符

条件和布尔操作符

数学和聚合操作符