Rxjs学习之路
1、小贴士
这篇文章是我的Angular Rxjs Series中的第篇四文章,在继续阅读本文之前,您至少应该熟悉系列中的第一篇基础文章:
1 | // 图谱 |
2 | // ----- 代表一个Observable |
3 | // -----X 代表一个Observable有错误发生 |
4 | // -----| 代表一个Observable结束 |
5 | // (1234)| 代表一个同步Observable结束 |
6 | |
7 | // 特别提示:以下的操作符介绍均采用rxjs6的写法!!! |
2、skip
略过前几个送出元素
1 | /** |
2 | * 略过前几个送出元素 |
3 | * 例如:interval(1000).pipe(skip(3)) |
4 | * source: -----0-----1-----2-----3--.. |
5 | * skip(3) |
6 | * newest: -----------------------3--.. |
7 | */ |
8 | const skipObservable = interval(1000).pipe( |
9 | skip(3), |
10 | take(3) |
11 | ); |
12 | skipObservable.subscribe({ |
13 | next: (value) => { console.log('=====skip操作符: ', value); }, |
14 | error: (err) => { console.log('=====skip操作符: Error: ', err); }, |
15 | complete: () => { console.log('=====skip操作符: complete!'); } |
16 | }); |
3、takeLast
倒过来取最后几个
1 | /** |
2 | * takeLast必须等到整个observable完成(complete),才能知道最后的元素有哪些,并且同步送出 |
3 | * 例如:interval(1000).pipe(take(4), takeLast(2)) |
4 | * source: -----0-----1-----2-----3| |
5 | * takeLast(2) |
6 | * newest: -----------------------(2,3)| |
7 | */ |
8 | const takeLastObservable = interval(1000).pipe( |
9 | take(4), |
10 | takeLast(2) |
11 | ); |
12 | takeLastObservable.subscribe({ |
13 | next: (value) => { console.log('=====takeLast操作符: ', value); }, |
14 | error: (err) => { console.log('=====takeLast操作符: Error: ', err); }, |
15 | complete: () => { console.log('=====takeLast操作符: complete!'); } |
16 | }); |
4、last
去最后送出的那个元素
1 | /** |
2 | * 取得最后一个元素 |
3 | * 例如:interval(1000).pipe(take(4), last()) |
4 | * source: -----0-----1-----2-----3| |
5 | * last() |
6 | * newest: -----------------------3| |
7 | */ |
8 | const lastObsverable = interval(1000).pipe( |
9 | take(4), |
10 | last() |
11 | ); |
12 | lastObsverable.subscribe({ |
13 | next: (value) => { console.log('=====last操作符: ', value); }, |
14 | error: (err) => { console.log('=====last操作符: Error: ', err); }, |
15 | complete: () => { console.log('=====last操作符: complete!'); } |
16 | }); |
5、concat
把多个observable合并成一个
1 | /** |
2 | * 把多个observable 实例合并成一个, 必须先等前一个observable完成(complete),才会继续下一个 |
3 | * source1: -----0-----1-----2| |
4 | * source2: (3)| |
5 | * source3: (45)| |
6 | * concat() |
7 | * newest: -----0-----1-----2-----(345| |
8 | */ |
9 | const source01 = interval(1000).pipe( |
10 | take(3) |
11 | ); |
12 | const source02 = of(3); |
13 | const source03 = of(4, 5); |
14 | // 第一种写法:使用Operator操作符 |
15 | const concatObservable = source01.pipe( |
16 | concat(source02, source03) |
17 | ); |
18 | concatObservable.subscribe({ |
19 | next: (value) => { console.log('=====concat操作符: ', value); }, |
20 | error: (err) => { console.log('=====concat操作符: Error: ', err); }, |
21 | complete: () => { console.log('=====concat操作符: complete!'); } |
22 | }); |
23 | // 第二种写法:使用rxjs内置静态函数 -- import { concat as rxConcat} from 'rxjs'; |
24 | const concatObservable2 = rxConcat( |
25 | source01, |
26 | source02, |
27 | source03 |
28 | ); |
29 | concatObservable2.subscribe({ |
30 | next: (value) => { console.log('=====concat2操作符: ', value); }, |
31 | error: (err) => { console.log('=====concat2操作符: Error: ', err); }, |
32 | complete: () => { console.log('=====concat2操作符: complete!'); } |
33 | }); |
6、startWith
在observable的一开始就要发送的元素(非Observable形式的)
1 | /** |
2 | * 一开始就要发送的元素, startWith 的值是一开始就同步发出的 |
3 | * 例如:interval(1000).pipe(startWith(0)) |
4 | * source: -----0-----1-----2-----3--.. |
5 | * startWith(0) |
6 | * newest: 0-----0-----1-----2-----3--.. |
7 | */ |
8 | const startWidthObservable = interval(1000).pipe( |
9 | startWith(0), |
10 | take(4) |
11 | ); |
12 | startWidthObservable.subscribe({ |
13 | next: (value) => { console.log('=====startWith操作符: ', value); }, |
14 | error: (err) => { console.log('=====startWith操作符: Error: ', err); }, |
15 | complete: () => { console.log('=====startWith操作符: complete!'); } |
16 | }); |
7、merge
merge跟concat一样都是用来合并Observable,但是稍微有些不同
1 | /** |
2 | * 把多个observable同时处理, merge 的逻辑有点像是OR(||),就是当两个observable 其中一个被触发时都可以被处理,这很常用在一个以上的按钮具有部分相同的行为。 |
3 | * source: ----0----1----2| |
4 | * source2: --0--1--2--3--4--5| |
5 | * merge() |
6 | * newest: --0-01--21-3--(24)--5| |
7 | */ |
8 | const observable01 = interval(500).pipe(take(3)); |
9 | const observable02 = interval(300).pipe(take(6)); |
10 | // 第一种写法:使用Operator操作符 |
11 | const mergeObservable = observable01.pipe( |
12 | merge(observable02) |
13 | ); |
14 | mergeObservable.subscribe({ |
15 | next: (value) => { console.log('=====merge操作符: ', value); }, |
16 | error: (err) => { console.log('=====merge操作符: Error: ', err); }, |
17 | complete: () => { console.log('=====merge操作符: complete!'); } |
18 | }); |
19 | |
20 | |
21 | // 第二种写法:使用rxjs内置静态函数 -- import { merge as rxMerge } from 'rxjs'; |
22 | const mergeObservable2 = rxMerge( |
23 | observable01, |
24 | observable02 |
25 | ); |
26 | mergeObservable2.subscribe({ |
27 | next: (value) => { console.log('=====merge2操作符: ', value); }, |
28 | error: (err) => { console.log('=====merge2操作符: Error: ', err); }, |
29 | complete: () => { console.log('=====merge2操作符: complete!'); } |
30 | }); |
完整例子
1 | import { Component, OnInit, OnDestroy } from '@angular/core'; |
2 | import { Subscription, interval, of, concat as rxConcat, merge as rxMerge } from 'rxjs'; |
3 | import { skip, take, takeLast, last, concat, startWith, merge } from 'rxjs/operators'; |
4 | |
5 | ({ |
6 | selector: 'app-rxjs-demo', |
7 | template: ` |
8 | <h3>Rxjs Demo To Study! -- Operators操作符(skip, takeLast, last, concat, startWith, merge)</h3> |
9 | <button (click)="skipHandler()">skip</button> |
10 | <button class="mgLeft" (click)="takeLastHandler()">takeLast</button> |
11 | <button class="mgLeft" (click)="lastHandler()">last</button> |
12 | <button class="mgLeft" (click)="concatHandler()">concat</button> |
13 | <button class="mgLeft" (click)="startWithHandler()">startWith</button> |
14 | <button class="mgLeft" (click)="mergeHandler()">merge</button> |
15 | <app-back></app-back> |
16 | `, |
17 | styles: [` |
18 | .mgLeft { |
19 | margin-left: 20px; |
20 | } |
21 | `] |
22 | }) |
23 | export class RxjsDemoComponent implements OnInit, OnDestroy { |
24 | skipSubscription: Subscription; |
25 | takeLastSubscription: Subscription; |
26 | lastSubscription: Subscription; |
27 | concatSubscription: Subscription; |
28 | concatSubscription2: Subscription; |
29 | startWithSubscription: Subscription; |
30 | mergeSubscription: Subscription; |
31 | mergeSubscription2: Subscription; |
32 | |
33 | constructor() { } |
34 | |
35 | ngOnInit(): void { |
36 | // 图谱 |
37 | // ----- 代表一个Observable |
38 | // -----X 代表一个Observable有错误发生 |
39 | // -----| 代表一个Observable结束 |
40 | // (1234)| 代表一个同步Observable结束 |
41 | } |
42 | |
43 | skipHandler() { |
44 | /** |
45 | * 略过前几个送出元素 |
46 | * 例如:interval(1000).pipe(skip(3)) |
47 | * source: -----0-----1-----2-----3--.. |
48 | * skip(3) |
49 | * newest: -----------------------3--.. |
50 | */ |
51 | const skipObservable = interval(1000).pipe(skip(3), take(3)); |
52 | this.skipSubscription = skipObservable.subscribe({ |
53 | next: (value) => { console.log('=====skip操作符: ', value); }, |
54 | error: (err) => { console.log('=====skip操作符: Error: ', err); }, |
55 | complete: () => { console.log('=====skip操作符: complete!'); } |
56 | }); |
57 | } |
58 | |
59 | takeLastHandler() { |
60 | /** |
61 | * takeLast必须等到整个observable完成(complete),才能知道最后的元素有哪些,并且同步送出 |
62 | * 例如:interval(1000).pipe(take(4), takeLast(2)) |
63 | * source: -----0-----1-----2-----3| |
64 | * takeLast(2) |
65 | * newest: -----------------------(2,3)| |
66 | */ |
67 | const takeLastObservable = interval(1000).pipe(take(4), takeLast(2)); |
68 | this.takeLastSubscription = takeLastObservable.subscribe({ |
69 | next: (value) => { console.log('=====takeLast操作符: ', value); }, |
70 | error: (err) => { console.log('=====takeLast操作符: Error: ', err); }, |
71 | complete: () => { console.log('=====takeLast操作符: complete!'); } |
72 | }); |
73 | } |
74 | |
75 | lastHandler() { |
76 | /** |
77 | * 取得最后一个元素 |
78 | * 例如:interval(1000).pipe(take(4), last()) |
79 | * source: -----0-----1-----2-----3| |
80 | * last() |
81 | * newest: -----------------------3| |
82 | */ |
83 | const lastObsverable = interval(1000).pipe(take(4), last()); |
84 | this.lastSubscription = lastObsverable.subscribe({ |
85 | next: (value) => { console.log('=====last操作符: ', value); }, |
86 | error: (err) => { console.log('=====last操作符: Error: ', err); }, |
87 | complete: () => { console.log('=====last操作符: complete!'); } |
88 | }); |
89 | } |
90 | |
91 | concatHandler() { |
92 | /** |
93 | * 把多个observable 实例合并成一个, 必须先等前一个observable完成(complete),才会继续下一个 |
94 | * source1: -----0-----1-----2| |
95 | * source2: (3)| |
96 | * source3: (45)| |
97 | * concat() |
98 | * newest: -----0-----1-----2-----(345| |
99 | */ |
100 | const source01 = interval(1000).pipe(take(3)); |
101 | const source02 = of(3); |
102 | const source03 = of(4, 5); |
103 | // 第一种写法:使用Operator操作符 |
104 | const concatObservable = source01.pipe(concat(source02, source03)); |
105 | this.concatSubscription = concatObservable.subscribe({ |
106 | next: (value) => { console.log('=====concat操作符: ', value); }, |
107 | error: (err) => { console.log('=====concat操作符: Error: ', err); }, |
108 | complete: () => { console.log('=====concat操作符: complete!'); } |
109 | }); |
110 | // 第二种写法:使用rxjs内置静态函数 |
111 | const concatObservable2 = rxConcat(source01, source02, source03); |
112 | this.concatSubscription2 = concatObservable2.subscribe({ |
113 | next: (value) => { console.log('=====concat2操作符: ', value); }, |
114 | error: (err) => { console.log('=====concat2操作符: Error: ', err); }, |
115 | complete: () => { console.log('=====concat2操作符: complete!'); } |
116 | }); |
117 | } |
118 | |
119 | startWithHandler() { |
120 | /** |
121 | * 一开始就要发送的元素, startWith 的值是一开始就同步发出的 |
122 | * 例如:interval(1000).pipe(startWith(0)) |
123 | * source: -----0-----1-----2-----3--.. |
124 | * startWith(0) |
125 | * newest: 0-----0-----1-----2-----3--.. |
126 | */ |
127 | const startWidthObservable = interval(1000).pipe(startWith(0), take(4)); |
128 | this.startWithSubscription = startWidthObservable.subscribe({ |
129 | next: (value) => { console.log('=====startWith操作符: ', value); }, |
130 | error: (err) => { console.log('=====startWith操作符: Error: ', err); }, |
131 | complete: () => { console.log('=====startWith操作符: complete!'); } |
132 | }); |
133 | } |
134 | |
135 | mergeHandler() { |
136 | /** |
137 | * 把多个observable同时处理, merge 的逻辑有点像是OR(||),就是当两个observable 其中一个被触发时都可以被处理,这很常用在一个以上的按钮具有部分相同的行为。 |
138 | * source: ----0----1----2| |
139 | * source2: --0--1--2--3--4--5| |
140 | * merge() |
141 | * newest: --0-01--21-3--(24)--5| |
142 | */ |
143 | const observable01 = interval(500).pipe(take(3)); |
144 | const observable02 = interval(300).pipe(take(6)); |
145 | // 第一种写法:使用Operator操作符 |
146 | const mergeObservable = observable01.pipe(merge(observable02)); |
147 | this.mergeSubscription = mergeObservable.subscribe({ |
148 | next: (value) => { console.log('=====merge操作符: ', value); }, |
149 | error: (err) => { console.log('=====merge操作符: Error: ', err); }, |
150 | complete: () => { console.log('=====merge操作符: complete!'); } |
151 | }); |
152 | // 第二种写法:使用rxjs内置静态函数 |
153 | const mergeObservable2 = rxMerge(observable01, observable02); |
154 | this.mergeSubscription2 = mergeObservable2.subscribe({ |
155 | next: (value) => { console.log('=====merge2操作符: ', value); }, |
156 | error: (err) => { console.log('=====merge2操作符: Error: ', err); }, |
157 | complete: () => { console.log('=====merge2操作符: complete!'); } |
158 | }); |
159 | } |
160 | |
161 | ngOnDestroy() { |
162 | if (this.skipSubscription) { |
163 | this.skipSubscription.unsubscribe(); |
164 | } |
165 | if (this.takeLastSubscription) { |
166 | this.takeLastSubscription.unsubscribe(); |
167 | } |
168 | if (this.lastSubscription) { |
169 | this.lastSubscription.unsubscribe(); |
170 | } |
171 | if (this.concatSubscription) { |
172 | this.concatSubscription.unsubscribe(); |
173 | } |
174 | if (this.concatSubscription2) { |
175 | this.concatSubscription2.unsubscribe(); |
176 | } |
177 | if (this.startWithSubscription) { |
178 | this.startWithSubscription.unsubscribe(); |
179 | } |
180 | if (this.mergeSubscription) { |
181 | this.mergeSubscription.unsubscribe(); |
182 | } |
183 | if (this.mergeSubscription2) { |
184 | this.mergeSubscription2.unsubscribe(); |
185 | } |
186 | } |
187 | } |
Marble Diagrams【宝珠图】
1. 这个Marble Diagrams【宝珠图】可以很灵活的表现出每个操作符的使用
2. 下面是超链接传送门