目录
  1. 1. 一、RxJava 的设计哲学
  2. 2. 二、Observable 的创建
    1. 2.1. 2.1 创建方式分类
    2. 2.2. 2.2 create 的底层实现
    3. 2.3. 2.3 冷 Observable vs 热 Observable
  3. 3. 三、Observer 与订阅流程
    1. 3.1. 3.1 Observer 的四个回调
    2. 3.2. 3.2 subscribe 的调用链
    3. 3.3. 3.3 Disposable 与资源管理
  4. 4. 四、操作符的装饰器模式
    1. 4.1. 4.1 map 操作符原理
    2. 4.2. 4.2 flatMap 操作符原理
    3. 4.3. 4.3 lift 操作符——操作符的通用基类
  5. 5. 五、线程调度(Scheduler)
    1. 5.1. 5.1 subscribeOn vs observeOn
    2. 5.2. 5.2 subscribeOn 实现原理
    3. 5.3. 5.3 observeOn 实现原理
    4. 5.4. 5.4 Schedulers 的内部实现
  6. 6. 六、背压(Backpressure)与 Flowable
    1. 6.1. 6.1 背压问题的产生
    2. 6.2. 6.2 Flowable 的背压策略
    3. 6.3. 6.3 BackpressureStrategy 详解
  7. 7. 七、实战场景
    1. 7.1. 7.1 网络请求处理
    2. 7.2. 7.2 多级缓存
    3. 7.3. 7.3 合并多个数据源
  8. 8. 八、内存泄漏与最佳实践
    1. 8.1. 8.1 RxJava 导致内存泄漏的常见原因
    2. 8.2. 8.2 使用 AutoDispose / RxLifecycle
  9. 9. 九、常见面试题
Java进阶之RxJava编程原理

一、RxJava 的设计哲学

RxJava 是 ReactiveX(Reactive Extensions)的 Java 实现,它将观察者模式(Observer Pattern)、迭代器模式(Iterator Pattern)和函数式编程融合在一起,提供了一种处理异步数据流的统一编程模型。

RxJava 解决的核心问题:异步操作的组合和编排。在 Android 开发中,我们经常需要处理:网络请求 → 数据库写入 → UI 更新,或者多个并发的网络请求 → 合并结果 → 错误处理。传统的回调(Callback)方式会导致”回调地狱”,而 RxJava 通过操作符(Operator)链将异步逻辑扁平化,使代码可读性大幅提升。

RxJava 的三大核心角色:

  • Observable(被观察者):数据的生产者,负责发射数据流
  • Observer(观察者):数据的消费者,负责处理接收到的数据
  • Operator(操作符):用于转换、过滤、组合数据流

二、Observable 的创建

2.1 创建方式分类

// 1. create —— 最基础,手动控制发射
Observable.create(emitter -> {
emitter.onNext("Hello");
emitter.onNext("RxJava");
emitter.onComplete();
});

// 2. just —— 发射固定的若干值(最多 10 个,内部调用 fromArray)
Observable.just("A", "B", "C");

// 3. fromArray / fromIterable —— 从集合/数组发射
List<String> list = Arrays.asList("a", "b", "c");
Observable.fromIterable(list);

// 4. fromCallable —— 从回调创建(适合有返回值的同步操作)
Observable.fromCallable(() -> {
return heavyComputation(); // 在订阅时执行
});

// 5. defer —— 延迟创建,每次订阅都重新创建
Observable.defer(() -> Observable.just(System.currentTimeMillis()));
// 不同订阅者获取不同的时间戳

// 6. interval —— 定时发射
Observable.interval(1, TimeUnit.SECONDS)
.take(5); // 发射 0, 1, 2, 3, 4

// 7. range —— 发射整数序列
Observable.range(1, 10); // 1 到 10

2.2 create 的底层实现

// Observable.create() 的核心流程
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
// 将 ObservableOnSubscribe 包装为 ObservableCreate
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

ObservableCreate 继承自 Observable,在 subscribeActual() 中创建 CreateEmitter 并回调 source.subscribe(emitter)

// ObservableCreate.java (简化)
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;

@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent); // 先回调 onSubscribe,让观察者可以 dispose
try {
source.subscribe(parent); // 触发数据发射
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}

2.3 冷 Observable vs 热 Observable

  • 冷 Observable(Cold):每个 Observer 订阅时,Observable 都从头开始发射数据流。just、fromArray、create 创建的默认都是冷的。类似看视频点播——每个人从头开始看。
  • 热 Observable(Hot):Observable 的发射独立于订阅,后来的 Observer 只能收到订阅之后的发射。类似直播——你打开的时间点决定了能看到什么。ConnectableObservable(通过 publish() 创建)和 Subject 系列是热的。
// 冷 Observable
Observable<Integer> cold = Observable.range(1, 3);
cold.subscribe(i -> Log.d("A", "" + i)); // 收到: 1, 2, 3
cold.subscribe(i -> Log.d("B", "" + i)); // 收到: 1, 2, 3

// 热 Observable
ConnectableObservable<Integer> hot = Observable.range(1, 3).publish();
hot.subscribe(i -> Log.d("A", "" + i)); // 订阅者 A
hot.connect(); // 开始发射
hot.subscribe(i -> Log.d("B", "" + i)); // 订阅者 B 可能收不到或只收到部分

三、Observer 与订阅流程

3.1 Observer 的四个回调

Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// 订阅建立时首先回调,d 可以用来取消订阅
}

@Override
public void onNext(String s) {
// 收到数据
}

@Override
public void onError(Throwable e) {
// 数据流中发生错误,之后不会再收到 onNext
}

@Override
public void onComplete() {
// 数据流结束,之后不会再收到 onNext
}
};

重要规则:onError 和 onComplete 是互斥的——一旦触发其中一个,数据流就终止了,之后不会再触发任何事件。

3.2 subscribe 的调用链

observable.subscribe(observer)
→ Observable.subscribeActual(observer) [抽象方法,由子类实现]
→ source.subscribe(emitter) [开始数据发射]
→ emitter.onNext(data)
→ observer.onNext(data)
→ emitter.onComplete()
→ observer.onComplete()

3.3 Disposable 与资源管理

Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(value -> Log.d("RX", "" + value));

// 5 秒后取消订阅
new Handler().postDelayed(() -> {
if (!disposable.isDisposed()) {
disposable.dispose(); // 停止接收数据,释放资源
}
}, 5000);

CompositeDisposable 用于批量管理多个 Disposable:

CompositeDisposable compositeDisposable = new CompositeDisposable();

compositeDisposable.add(observable1.subscribe(...));
compositeDisposable.add(observable2.subscribe(...));
compositeDisposable.add(observable3.subscribe(...));

// 在 Activity.onDestroy() 中一次性取消所有订阅
@Override
protected void onDestroy() {
compositeDisposable.clear(); // 或 dispose()
super.onDestroy();
}

四、操作符的装饰器模式

4.1 map 操作符原理

map 是 RxJava 中最常用的操作符之一,它将数据流中的每个元素转换为另一种类型。其底层实现是经典的装饰器模式(Decorator Pattern):

// Observable.java
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
// 创建 ObservableMap —— 包装当前 Observable 和 mapper
return RxJavaPlugins.onAssembly(
new ObservableMap<T, R>(this, mapper));
}
// ObservableMap.java (简化)
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;

@Override
public void subscribeActual(Observer<? super U> t) {
// 创建 MapObserver 包装下游 Observer,然后订阅上游 Observable
source.subscribe(new MapObserver<T, U>(t, function));
}
}

// MapObserver 在收到上游数据时执行 mapper,将结果传给下游
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;

@Override
public void onNext(T t) {
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v); // 向下游发射转换后的数据
}
}

装饰器链的构建

observable
.map(mapper1) // → ObservableMap1 (持有 observable, mapper1)
.filter(predicate) // → ObservableFilter (持有 ObservableMap1, predicate)
.map(mapper2) // → ObservableMap2 (持有 ObservableFilter, mapper2)

最终的订阅调用链:

Observer
← MapObserver2 (持有 mapper2)
← FilterObserver (持有 predicate)
← MapObserver1 (持有 mapper1)
← 原始 Observable

4.2 flatMap 操作符原理

flatMap 是 map 的升级版——map 是一对一转换,flatMap 是一对多(每个元素变成一个新的 Observable):

Observable.just("user1", "user2")
.flatMap(userId -> {
// 对每个 userId 发起一个网络请求(返回 Observable)
return apiService.getUser(userId);
})
.subscribe(user -> Log.d("RX", user.getName()));

flatMap 的核心挑战是并发——多个内部 Observable 可能同时发射数据,flatMap 通过 merge 合并它们。其内部实现:

// ObservableFlatMap.java (核心逻辑简化)
@Override
public void onNext(T t) {
ObservableSource<? extends R> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t),
"The mapper returned a null ObservableSource");
} catch (Throwable e) {
onError(e);
return;
}

// 限制并发数(flatMap 默认 maxConcurrency = Integer.MAX_VALUE)
if (maxConcurrency != Integer.MAX_VALUE) {
// 如果达到并发上限,将上游数据放入队列
if (active.get() == maxConcurrency) {
queue.offer(t);
return;
}
}

InnerObserver inner = new InnerObserver(t);
downstream.onSubscribe(inner);
active.incrementAndGet();
p.subscribe(inner); // 订阅内部 Observable
}

flatMap vs concatMap vs switchMap

操作符 行为 适用场景
flatMap 并发合并所有内部 Observable 的数据 多个独立请求同时进行
concatMap 按顺序一个一个处理(前一个完成后才订阅下一个) 需要严格顺序的场景
switchMap 新元素到来时,取消前一个内部 Observable 搜索建议(只关心最新输入)

4.3 lift 操作符——操作符的通用基类

大部分操作符都基于 lift() 实现。lift 是 RxJava 最早的核心设计之一:

public final <R> Observable<R> lift(ObservableOperator<? extends R, ? super T> lifter) {
ObjectHelper.requireNonNull(lifter, "onLift is null");
return RxJavaPlugins.onAssembly(new ObservableLift<R, T>(this, lifter));
}

lift 将一个 ObservableOperator 插入到订阅链中,Operator 包装下游的 Observer,然后将包装后的 Observer 传给上游。这与 map 的原理完全一致,map 就是 lift 的一个特例。

五、线程调度(Scheduler)

5.1 subscribeOn vs observeOn

这是面试中最常被问到的 RxJava 问题。

Observable.just("Hello")
.subscribeOn(Schedulers.io()) // 指定上游 Observable 在 IO 线程执行
.observeOn(AndroidSchedulers.mainThread()) // 指定下游 Observer 在主线程接收
.map(s -> s + " World") // 受 subscribeOn 影响,在 IO 线程执行
.observeOn(Schedulers.computation()) // 切换下游的计算线程
.filter(s -> s.length() > 5) // 在 computation 线程执行
.subscribe(result -> { // 在 computation 线程接收
Log.d("RX", result);
});

核心规则

  • subscribeOn 指定上游 Observable 启动时所在的线程;多次调用只有第一个生效(后续的被忽略)
  • observeOn 指定下游 Observer 接收数据的线程;多次调用可以不断切换下游线程

5.2 subscribeOn 实现原理

// ObservableSubscribeOn.java (简化)
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);

// 将 source.subscribe(parent) 提交到 scheduler 的线程池中执行
parent.setDisposable(scheduler.scheduleDirect(() -> {
source.subscribe(parent); // 在 scheduler 指定的线程中执行
}));
}
}

subscribeOn 的原理很简单:它将订阅上游这个操作调度到了目标线程。因为数据发射是由订阅触发的(冷 Observable),所以整个数据流就从调度线程开始执行。

5.3 observeOn 实现原理

observeOn 比 subscribeOn 更复杂,因为它需要在数据流中插入线程切换点:

// ObservableObserveOn.java (简化)
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {

final Observer<? super T> downstream;
final Scheduler.Worker worker;
SimpleQueue<T> queue; // 用于缓存数据的队列

@Override
public void onNext(T t) {
// 将数据放入队列
queue.offer(t);
// 调度执行 run() 方法
schedule();
}

void schedule() {
worker.schedule(this); // 提交到目标线程
}

@Override
public void run() {
drainNormal(); // 在目标线程中从队列取数据并发射
}

void drainNormal() {
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;

for (;;) {
T v = q.poll();
if (v == null) break;
a.onNext(v); // 在目标线程中回调下游
}
}
}

observeOn 使用生产者-消费者队列:上游在工作线程中向队列 put 数据,然后触发调度;下游在目标线程中从队列 take 数据并回调 onNext。队列在这里起到了线程间缓冲和通信的作用。

5.4 Schedulers 的内部实现

// Schedulers.computation() 对应
// 线程数 = CPU 核心数,适用于计算密集任务
static final Scheduler COMPUTATION =
RxJavaPlugins.initComputationScheduler(
new ComputationTask());

// Schedulers.io() 对应
// 无界线程池,适用于 IO 密集任务
// 内部有一个 60 秒的保活机制
static final Scheduler IO =
RxJavaPlugins.initIoScheduler(
new IOTask());

// Schedulers.newThread()
// 每次都创建新线程,适用于需要隔离的短任务

// Schedulers.single()
// 单线程,适用于需要顺序执行的任务

// AndroidSchedulers.mainThread()
// 通过 Handler 在主线程执行(RxAndroid 提供)

Android 中的 Scheduler 使用最佳实践

Observable.create(emitter -> {
// 网络请求 —— 在 IO 线程
Response response = apiService.doSomething();
emitter.onNext(response);
})
.subscribeOn(Schedulers.io()) // 上游在 IO 线程
.observeOn(Schedulers.computation()) // 解析工作在计算线程
.map(response -> parser.parse(response))
.observeOn(AndroidSchedulers.mainThread()) // UI 更新在主线程
.subscribe(data -> {
textView.setText(data.toString());
});

六、背压(Backpressure)与 Flowable

6.1 背压问题的产生

当上游 Observable 发射数据的速度超过下游 Observer 处理的速度时,就会产生背压问题。最典型的场景:上游每秒发射 10000 个数据,下游每秒只能处理 100 个。

Observable 默认没有背压策略——数据会积压在操作符内部的缓冲区中,最终导致 OutOfMemoryError

6.2 Flowable 的背压策略

RxJava 2.x 引入了 Flowable 类型来解决背压问题:

// Observable → 不支持背压
// Flowable → 支持背压(通过 Subscription.request() 机制)

Flowable 通过 request(n) 实现响应式拉取(Reactive Pull):

Flowable.create(emitter -> {
for (int i = 0; i < 1000000; i++) {
emitter.onNext(i); // 但是如果下游只 request(10),这里必须等待
}
}, BackpressureStrategy.BUFFER) // 指定背压策略
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(10); // 我只请求 10 个数据
}

@Override
public void onNext(Integer integer) {
// 处理完一个后,可以再次 request
}

@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});

6.3 BackpressureStrategy 详解

策略 行为 适用场景
MISSING 不缓冲不丢弃,由下游通过 request() 控制 下游能够处理所有数据
ERROR 下游处理不过来时抛出 MissingBackpressureException 不允许丢数据的场景
BUFFER 无界缓冲所有数据(可能导致 OOM) 数据量可控的场景
DROP 丢弃下游来不及处理的数据 状态更新(只关心最新值)
LATEST 保留下游来不及处理的最新一个数据 类似 DROP,但保留最新值

七、实战场景

7.1 网络请求处理

// 场景:搜索 + 防抖 + 错误重试 + 线程切换
RxTextView.textChanges(searchEditText) // RxBinding
.debounce(300, TimeUnit.MILLISECONDS) // 300ms 防抖
.filter(text -> text.length() > 1)
.distinctUntilChanged() // 忽略与上次相同的输入
.switchMap(keyword -> {
// switchMap 取消上一次未完成的搜索
return apiService.search(keyword)
.subscribeOn(Schedulers.io())
.retry(2); // 失败重试 2 次
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
results -> adapter.setData(results),
error -> showError(error)
);

7.2 多级缓存

// 场景:内存缓存 → 磁盘缓存 → 网络
Observable.concat(
memoryCache.getData(), // 第一级:内存
diskCache.getData(), // 第二级:磁盘
network.getData() // 第三级:网络
.doOnNext(data -> diskCache.save(data)) // 缓存到磁盘
)
.firstElement() // 取第一个非空数据
.toObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> updateUI(data));

7.3 合并多个数据源

// 场景:首页需要同时展示用户信息和推荐列表
Observable.zip(
apiService.getUserProfile(userId).subscribeOn(Schedulers.io()),
apiService.getRecommendations().subscribeOn(Schedulers.io()),
(user, recommendations) -> new HomePageData(user, recommendations)
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
homeData -> {
displayProfile(homeData.user);
displayRecommendations(homeData.recommendations);
},
error -> showError(error)
);

八、内存泄漏与最佳实践

8.1 RxJava 导致内存泄漏的常见原因

// 问题1:Observable 内部持有 Activity 引用后,观察者没有被 dispose
Observable.interval(1, TimeUnit.SECONDS) // 无限发射
.observeOn(AndroidSchedulers.mainThread())
.subscribe(value -> {
textView.setText("" + value); // textView 持有 Activity 引用
});
// Activity finish 后,interval 仍在发射,textView 持有 Activity → 泄漏

// 解决方案
private CompositeDisposable disposables = new CompositeDisposable();

@Override
protected void onCreate(Bundle savedInstanceState) {
disposables.add(
Observable.interval(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(value -> textView.setText("" + value))
);
}

@Override
protected void onDestroy() {
disposables.clear();
super.onDestroy();
}

8.2 使用 AutoDispose / RxLifecycle

// AutoDispose — 推荐
Observable.interval(1, TimeUnit.SECONDS)
.as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
// Activity onStop 时自动 dispose
.subscribe(...);

// RxLifecycle
Observable.interval(1, TimeUnit.SECONDS)
.compose(RxLifecycle.bindUntilEvent(lifecycle, ActivityEvent.DESTROY))
.subscribe(...);

九、常见面试题

Q1: subscribeOn 和 observeOn 的区别是什么?为什么多次 subscribeOn 只有第一次生效?

A: subscribeOn 影响的是上游 Observable 的订阅线程,即 source.subscribe() 这个操作在哪个线程执行——这意味着数据流的源头在哪个线程启动。observeOn 影响的是下游 Observer 的接收线程,即 onNext/onError/onComplete 在哪个线程回调。多次 subscribeOn 只有第一次生效的原因是:数据流是从上游往下游传播的,第二个 subscribeOn 只会影响它上游之前的代码,但此时第一个 subscribeOn 已经决定了最初 source.subscribe() 的执行线程。简单的记忆法则:subscribeOn 管”从哪里开始”,observeOn 管”接下来去哪”。

Q2: map 和 flatMap 的实现原理有何不同?什么情况下该用哪个?

A: map 是一对一的同步转换。它在当前线程中直接执行 mapper,将结果立即传给下游——没有创建新的 Observable,没有线程切换。flatMap 是一对多的异步转换。它将每个上游数据映射为一个 Observable,然后通过 merge 机制合并这些内部 Observable 的数据。flatMap 的内部是并发的(默认 maxConcurrency 为 Integer.MAX_VALUE),多个内部 Observable 可以同时发射数据。使用场景:map 用于简单的类型转换(如 JSON → Model);flatMap 用于一个输入触发异步操作(如 userId → 网络请求获取用户详情)。需要注意:如果 mapper 返回的是 Observable,使用 flatMap;如果返回值不会异步,用 map 更高效(减少 Observable 创建开销)。

Q3: Observable 和 Flowable 的区别是什么?什么时候应该使用 Flowable?

A: Observable 不支持背压(Backpressure),适用于数据流速度可控的场景(如 UI 事件、网络请求响应等少量数据)。Flowable 支持背压,适用于上游速度快于下游的场景(如传感器数据、文件读取、数据库查询返回大量数据)。Flowable 通过 Subscriber 的 request(n) 机制实现响应式拉取,下游声明自己能处理多少个数据,上游根据此信号控制发射速率。在 Android 开发中,大多数场景可以用 Observable(因为 Android 中数据流通常数据量小、发射间隔长),但当处理 1000+ 条数据库查询结果或文件流时,应使用 Flowable 并设置合适的背压策略。

Q4: RxJava 中的 Disposable 是什么?为什么需要它?如何正确管理 Disposable?

A: Disposable 是 RxJava 中用于取消订阅的接口。每个 subscribe() 调用都返回一个 Disposable。它的作用是:(1) 停止 Observer 接收数据;(2) 释放上游 Observable 持有的资源(如取消网络请求、关闭文件流、停止定时器)。不及时 dispose 会导致内存泄漏——Observable 持有 Activity/Fragment 的引用,即使 Activity 已经 finish 了,Observable 仍在发射数据。正确的管理方式:(1) 使用 CompositeDisposable 聚合所有 Disposable,在 onDestroy() 中调用 compositeDisposable.clear();(2) 使用 RxLifecycle 或 AutoDispose 框架自动管理;(3) 使用 takeUntil() 操作符(如 takeUntil(lifecycleEvent))在特定条件下自动取消订阅。

Q5: RxJava 的操作符链是如何构建的?它使用了什么设计模式?

A: RxJava 的操作符链基于装饰器模式(Decorator Pattern)。每个操作符(map、filter、flatMap)都会创建一个新的 Observable 子类对象,该对象包装(持有)上游的 Observable。当调用 subscribe() 时,订阅从下游向上游反向传播:最终的 Observer 被包装为 FilterObserver,传给 ObservableMap → FilterObserver 包装为 MapObserver,传给 ObservableFilter → MapObserver 包装为原始 Observer,传给原始 Observable → 原始 Observable 开始发射数据 → 数据沿 Observer 链依次经过 MapObserver(执行 mapper)→ FilterObserver(执行 predicate)→ 最终 Observer(接收结果)。这种设计的好处是每个操作符职责单一、可组合,且所有操作符的复用性高。

Q6: switchMap、concatMap 和 flatMap 的区别和使用场景?

A: 三者都执行”一个上游数据映射为一个 Observable”的操作,区别在于对多个内部 Observable 的处理方式。flatMap 并发执行所有内部 Observable——所有内部 Observable 同时启动,谁先返回数据谁先发射,顺序不保证。适用于多个独立的、没有顺序要求的并发请求。concatMap 串行执行——前一个内部 Observable 完成后,才启动下一个。适用于需要严格顺序的场景,如先登录再获取用户数据。switchMap 新来切换——当新的上游数据到来时,如果上一个内部 Observable 还未完成,直接取消它,只处理最新的。适用于搜索建议(用户快速输入时,取消旧的搜索请求,只保留最新的)。性能方面:flatMap 最快(并发),switchMap 其次(取消旧请求节省资源),concatMap 最慢(串行等待)。


参考文档:

打赏
  • 微信
  • 支付宝

评论