0%

Rxjs【skip, takeLast, last, concat, startWith, merge】

Rxjs学习之路

1、小贴士

这篇文章是我的Angular Rxjs Series中的第篇四文章,在继续阅读本文之前,您至少应该熟悉系列中的第一篇基础文章:

Rxjs6都改变了些什么?

Rxjs【Observable】

1
// 图谱
2
// ----- 代表一个Observable
3
// -----X 代表一个Observable有错误发生
4
// -----| 代表一个Observable结束
5
// (1234)| 代表一个同步Observable结束
6
7
// 特别提示:以下的操作符介绍均采用rxjs6的写法!!!

2、skip

略过前几个送出元素
1
/**
2
 * 略过前几个送出元素
3
 * 例如:interval(1000).pipe(skip(3))
4
 * source:      -----0-----1-----2-----3--..
5
 *                        skip(3)
6
 * newest:      -----------------------3--..
7
 */
8
const skipObservable = interval(1000).pipe(
9
    skip(3),
10
    take(3)
11
);
12
skipObservable.subscribe({
13
    next: (value) => { console.log('=====skip操作符: ', value); },
14
    error: (err) => { console.log('=====skip操作符: Error: ', err); },
15
    complete: () => { console.log('=====skip操作符: complete!'); }
16
});

3、takeLast

倒过来取最后几个
1
/**
2
 * takeLast必须等到整个observable完成(complete),才能知道最后的元素有哪些,并且同步送出
3
 * 例如:interval(1000).pipe(take(4), takeLast(2))
4
 * source:      -----0-----1-----2-----3|
5
 *                      takeLast(2)
6
 * newest:      -----------------------(2,3)|
7
 */
8
const takeLastObservable = interval(1000).pipe(
9
    take(4),
10
    takeLast(2)
11
);
12
takeLastObservable.subscribe({
13
    next: (value) => { console.log('=====takeLast操作符: ', value); },
14
    error: (err) => { console.log('=====takeLast操作符: Error: ', err); },
15
    complete: () => { console.log('=====takeLast操作符: complete!'); }
16
});

4、last

去最后送出的那个元素
1
/**
2
 * 取得最后一个元素
3
 * 例如:interval(1000).pipe(take(4), last())
4
 * source:      -----0-----1-----2-----3|
5
 *                      last()
6
 * newest:      -----------------------3|
7
 */
8
const lastObsverable = interval(1000).pipe(
9
    take(4),
10
    last()
11
);
12
lastObsverable.subscribe({
13
    next: (value) => { console.log('=====last操作符: ', value); },
14
    error: (err) => { console.log('=====last操作符: Error: ', err); },
15
    complete: () => { console.log('=====last操作符: complete!'); }
16
});

5、concat

把多个observable合并成一个
1
/**
2
 * 把多个observable 实例合并成一个, 必须先等前一个observable完成(complete),才会继续下一个
3
 * source1:     -----0-----1-----2|
4
 * source2:     (3)|
5
 * source3:     (45)|
6
 *            concat()
7
 * newest:      -----0-----1-----2-----(345|
8
 */
9
const source01 = interval(1000).pipe(
10
    take(3)
11
);
12
const source02 = of(3);
13
const source03 = of(4, 5);
14
// 第一种写法:使用Operator操作符
15
const concatObservable = source01.pipe(
16
    concat(source02, source03)
17
);
18
concatObservable.subscribe({
19
    next: (value) => { console.log('=====concat操作符: ', value); },
20
    error: (err) => { console.log('=====concat操作符: Error: ', err); },
21
    complete: () => { console.log('=====concat操作符: complete!'); }
22
});
23
// 第二种写法:使用rxjs内置静态函数 -- import { concat as rxConcat} from 'rxjs';
24
const concatObservable2 = rxConcat(
25
    source01,
26
    source02,
27
    source03
28
);
29
concatObservable2.subscribe({
30
    next: (value) => { console.log('=====concat2操作符: ', value); },
31
    error: (err) => { console.log('=====concat2操作符: Error: ', err); },
32
    complete: () => { console.log('=====concat2操作符: complete!'); }
33
});

6、startWith

在observable的一开始就要发送的元素(非Observable形式的)
1
/**
2
 * 一开始就要发送的元素, startWith 的值是一开始就同步发出的
3
 * 例如:interval(1000).pipe(startWith(0))
4
 * source:      -----0-----1-----2-----3--..
5
 *                     startWith(0)
6
 * newest:      0-----0-----1-----2-----3--..
7
 */
8
const startWidthObservable = interval(1000).pipe(
9
    startWith(0),
10
    take(4)
11
);
12
startWidthObservable.subscribe({
13
    next: (value) => { console.log('=====startWith操作符: ', value); },
14
    error: (err) => { console.log('=====startWith操作符: Error: ', err); },
15
    complete: () => { console.log('=====startWith操作符: complete!'); }
16
});

7、merge

merge跟concat一样都是用来合并Observable,但是稍微有些不同
1
/**
2
 * 把多个observable同时处理, merge 的逻辑有点像是OR(||),就是当两个observable 其中一个被触发时都可以被处理,这很常用在一个以上的按钮具有部分相同的行为。
3
 * source:      ----0----1----2|
4
 * source2:     --0--1--2--3--4--5|
5
 *                   merge()
6
 * newest:      --0-01--21-3--(24)--5|
7
 */
8
const observable01 = interval(500).pipe(take(3));
9
const observable02 = interval(300).pipe(take(6));
10
// 第一种写法:使用Operator操作符
11
const mergeObservable = observable01.pipe(
12
    merge(observable02)
13
);
14
mergeObservable.subscribe({
15
    next: (value) => { console.log('=====merge操作符: ', value); },
16
    error: (err) => { console.log('=====merge操作符: Error: ', err); },
17
    complete: () => { console.log('=====merge操作符: complete!'); }
18
});
19
20
21
// 第二种写法:使用rxjs内置静态函数 -- import { merge as rxMerge } from 'rxjs';
22
const mergeObservable2 = rxMerge(
23
    observable01,
24
    observable02
25
);
26
mergeObservable2.subscribe({
27
    next: (value) => { console.log('=====merge2操作符: ', value); },
28
    error: (err) => { console.log('=====merge2操作符: Error: ', err); },
29
    complete: () => { console.log('=====merge2操作符: complete!'); }
30
});
完整例子
1
import { Component, OnInit, OnDestroy } from '@angular/core';
2
import { Subscription, interval, of, concat as rxConcat, merge as rxMerge } from 'rxjs';
3
import { skip, take, takeLast, last, concat, startWith, merge } from 'rxjs/operators';
4
5
@Component({
6
    selector: 'app-rxjs-demo',
7
    template: `
8
        <h3>Rxjs Demo To Study! -- Operators操作符(skip, takeLast, last, concat, startWith, merge)</h3>
9
        <button (click)="skipHandler()">skip</button>
10
        <button class="mgLeft" (click)="takeLastHandler()">takeLast</button>
11
        <button class="mgLeft" (click)="lastHandler()">last</button>
12
        <button class="mgLeft" (click)="concatHandler()">concat</button>
13
        <button class="mgLeft" (click)="startWithHandler()">startWith</button>
14
        <button class="mgLeft" (click)="mergeHandler()">merge</button>
15
        <app-back></app-back>
16
    `,
17
    styles: [`
18
        .mgLeft {
19
            margin-left: 20px;
20
        }
21
    `]
22
})
23
export class RxjsDemoComponent implements OnInit, OnDestroy {
24
    skipSubscription: Subscription;
25
    takeLastSubscription: Subscription;
26
    lastSubscription: Subscription;
27
    concatSubscription: Subscription;
28
    concatSubscription2: Subscription;
29
    startWithSubscription: Subscription;
30
    mergeSubscription: Subscription;
31
    mergeSubscription2: Subscription;
32
33
    constructor() { }
34
35
    ngOnInit(): void {
36
        // 图谱
37
        // ----- 代表一个Observable
38
        // -----X 代表一个Observable有错误发生
39
        // -----| 代表一个Observable结束
40
        // (1234)| 代表一个同步Observable结束
41
    }
42
43
    skipHandler() {
44
        /**
45
         * 略过前几个送出元素
46
         * 例如:interval(1000).pipe(skip(3))
47
         * source:      -----0-----1-----2-----3--..
48
         *                        skip(3)
49
         * newest:      -----------------------3--..
50
         */
51
        const skipObservable = interval(1000).pipe(skip(3), take(3));
52
        this.skipSubscription = skipObservable.subscribe({
53
            next: (value) => { console.log('=====skip操作符: ', value); },
54
            error: (err) => { console.log('=====skip操作符: Error: ', err); },
55
            complete: () => { console.log('=====skip操作符: complete!'); }
56
        });
57
    }
58
59
    takeLastHandler() {
60
        /**
61
         * takeLast必须等到整个observable完成(complete),才能知道最后的元素有哪些,并且同步送出
62
         * 例如:interval(1000).pipe(take(4), takeLast(2))
63
         * source:      -----0-----1-----2-----3|
64
         *                      takeLast(2)
65
         * newest:      -----------------------(2,3)|
66
         */
67
        const takeLastObservable = interval(1000).pipe(take(4), takeLast(2));
68
        this.takeLastSubscription = takeLastObservable.subscribe({
69
            next: (value) => { console.log('=====takeLast操作符: ', value); },
70
            error: (err) => { console.log('=====takeLast操作符: Error: ', err); },
71
            complete: () => { console.log('=====takeLast操作符: complete!'); }
72
        });
73
    }
74
75
    lastHandler() {
76
        /**
77
         * 取得最后一个元素
78
         * 例如:interval(1000).pipe(take(4), last())
79
         * source:      -----0-----1-----2-----3|
80
         *                      last()
81
         * newest:      -----------------------3|
82
         */
83
        const lastObsverable = interval(1000).pipe(take(4), last());
84
        this.lastSubscription = lastObsverable.subscribe({
85
            next: (value) => { console.log('=====last操作符: ', value); },
86
            error: (err) => { console.log('=====last操作符: Error: ', err); },
87
            complete: () => { console.log('=====last操作符: complete!'); }
88
        });
89
    }
90
91
    concatHandler() {
92
        /**
93
         * 把多个observable 实例合并成一个, 必须先等前一个observable完成(complete),才会继续下一个
94
         * source1:     -----0-----1-----2|
95
         * source2:     (3)|
96
         * source3:     (45)|
97
         *            concat()
98
         * newest:      -----0-----1-----2-----(345|
99
         */
100
        const source01 = interval(1000).pipe(take(3));
101
        const source02 = of(3);
102
        const source03 = of(4, 5);
103
        // 第一种写法:使用Operator操作符
104
        const concatObservable = source01.pipe(concat(source02, source03));
105
        this.concatSubscription = concatObservable.subscribe({
106
            next: (value) => { console.log('=====concat操作符: ', value); },
107
            error: (err) => { console.log('=====concat操作符: Error: ', err); },
108
            complete: () => { console.log('=====concat操作符: complete!'); }
109
        });
110
        // 第二种写法:使用rxjs内置静态函数
111
        const concatObservable2 = rxConcat(source01, source02, source03);
112
        this.concatSubscription2 = concatObservable2.subscribe({
113
            next: (value) => { console.log('=====concat2操作符: ', value); },
114
            error: (err) => { console.log('=====concat2操作符: Error: ', err); },
115
            complete: () => { console.log('=====concat2操作符: complete!'); }
116
        });
117
    }
118
119
    startWithHandler() {
120
        /**
121
         * 一开始就要发送的元素, startWith 的值是一开始就同步发出的
122
         * 例如:interval(1000).pipe(startWith(0))
123
         * source:      -----0-----1-----2-----3--..
124
         *                     startWith(0)
125
         * newest:      0-----0-----1-----2-----3--..
126
         */
127
        const startWidthObservable = interval(1000).pipe(startWith(0), take(4));
128
        this.startWithSubscription = startWidthObservable.subscribe({
129
            next: (value) => { console.log('=====startWith操作符: ', value); },
130
            error: (err) => { console.log('=====startWith操作符: Error: ', err); },
131
            complete: () => { console.log('=====startWith操作符: complete!'); }
132
        });
133
    }
134
135
    mergeHandler() {
136
        /**
137
         * 把多个observable同时处理, merge 的逻辑有点像是OR(||),就是当两个observable 其中一个被触发时都可以被处理,这很常用在一个以上的按钮具有部分相同的行为。
138
         * source:      ----0----1----2|
139
         * source2:     --0--1--2--3--4--5|
140
         *                   merge()
141
         * newest:      --0-01--21-3--(24)--5|
142
         */
143
        const observable01 = interval(500).pipe(take(3));
144
        const observable02 = interval(300).pipe(take(6));
145
        // 第一种写法:使用Operator操作符
146
        const mergeObservable = observable01.pipe(merge(observable02));
147
        this.mergeSubscription = mergeObservable.subscribe({
148
            next: (value) => { console.log('=====merge操作符: ', value); },
149
            error: (err) => { console.log('=====merge操作符: Error: ', err); },
150
            complete: () => { console.log('=====merge操作符: complete!'); }
151
        });
152
        // 第二种写法:使用rxjs内置静态函数
153
        const mergeObservable2 = rxMerge(observable01, observable02);
154
        this.mergeSubscription2 = mergeObservable2.subscribe({
155
            next: (value) => { console.log('=====merge2操作符: ', value); },
156
            error: (err) => { console.log('=====merge2操作符: Error: ', err); },
157
            complete: () => { console.log('=====merge2操作符: complete!'); }
158
        });
159
    }
160
161
    ngOnDestroy() {
162
        if (this.skipSubscription) {
163
            this.skipSubscription.unsubscribe();
164
        }
165
        if (this.takeLastSubscription) {
166
            this.takeLastSubscription.unsubscribe();
167
        }
168
        if (this.lastSubscription) {
169
            this.lastSubscription.unsubscribe();
170
        }
171
        if (this.concatSubscription) {
172
            this.concatSubscription.unsubscribe();
173
        }
174
        if (this.concatSubscription2) {
175
            this.concatSubscription2.unsubscribe();
176
        }
177
        if (this.startWithSubscription) {
178
            this.startWithSubscription.unsubscribe();
179
        }
180
        if (this.mergeSubscription) {
181
            this.mergeSubscription.unsubscribe();
182
        }
183
        if (this.mergeSubscription2) {
184
            this.mergeSubscription2.unsubscribe();
185
        }
186
    }
187
}

Marble Diagrams【宝珠图】

1. 这个Marble Diagrams【宝珠图】可以很灵活的表现出每个操作符的使用
2. 下面是超链接传送门

Marble Diagrams【宝珠图】