一、RxJava 的设计哲学
RxJava 是 ReactiveX(Reactive Extensions)的 Java 实现,它将观察者模式(Observer Pattern)、迭代器模式(Iterator Pattern)和函数式编程融合在一起,提供了一种处理异步数据流的统一编程模型。
RxJava 解决的核心问题:异步操作的组合和编排。在 Android 开发中,我们经常需要处理:网络请求 → 数据库写入 → UI 更新,或者多个并发的网络请求 → 合并结果 → 错误处理。传统的回调(Callback)方式会导致”回调地狱”,而 RxJava 通过操作符(Operator)链将异步逻辑扁平化,使代码可读性大幅提升。
RxJava 的三大核心角色:
- Observable(被观察者):数据的生产者,负责发射数据流
- Observer(观察者):数据的消费者,负责处理接收到的数据
- Operator(操作符):用于转换、过滤、组合数据流
二、Observable 的创建
2.1 创建方式分类
Observable.create(emitter -> { emitter.onNext("Hello"); emitter.onNext("RxJava"); emitter.onComplete(); });
Observable.just("A", "B", "C");
List<String> list = Arrays.asList("a", "b", "c"); Observable.fromIterable(list);
Observable.fromCallable(() -> { return heavyComputation(); });
Observable.defer(() -> Observable.just(System.currentTimeMillis()));
Observable.interval(1, TimeUnit.SECONDS) .take(5);
Observable.range(1, 10);
|
2.2 create 的底层实现
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
|
ObservableCreate 继承自 Observable,在 subscribeActual() 中创建 CreateEmitter 并回调 source.subscribe(emitter):
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } }
|
2.3 冷 Observable vs 热 Observable
- 冷 Observable(Cold):每个 Observer 订阅时,Observable 都从头开始发射数据流。just、fromArray、create 创建的默认都是冷的。类似看视频点播——每个人从头开始看。
- 热 Observable(Hot):Observable 的发射独立于订阅,后来的 Observer 只能收到订阅之后的发射。类似直播——你打开的时间点决定了能看到什么。
ConnectableObservable(通过 publish() 创建)和 Subject 系列是热的。
Observable<Integer> cold = Observable.range(1, 3); cold.subscribe(i -> Log.d("A", "" + i)); cold.subscribe(i -> Log.d("B", "" + i));
ConnectableObservable<Integer> hot = Observable.range(1, 3).publish(); hot.subscribe(i -> Log.d("A", "" + i)); hot.connect(); hot.subscribe(i -> Log.d("B", "" + i));
|
三、Observer 与订阅流程
3.1 Observer 的四个回调
Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } };
|
重要规则:onError 和 onComplete 是互斥的——一旦触发其中一个,数据流就终止了,之后不会再触发任何事件。
3.2 subscribe 的调用链
observable.subscribe(observer) → Observable.subscribeActual(observer) [抽象方法,由子类实现] → source.subscribe(emitter) [开始数据发射] → emitter.onNext(data) → observer.onNext(data) → emitter.onComplete() → observer.onComplete()
|
3.3 Disposable 与资源管理
Disposable disposable = Observable.interval(1, TimeUnit.SECONDS) .subscribe(value -> Log.d("RX", "" + value));
new Handler().postDelayed(() -> { if (!disposable.isDisposed()) { disposable.dispose(); } }, 5000);
|
CompositeDisposable 用于批量管理多个 Disposable:
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(observable1.subscribe(...)); compositeDisposable.add(observable2.subscribe(...)); compositeDisposable.add(observable3.subscribe(...));
@Override protected void onDestroy() { compositeDisposable.clear(); super.onDestroy(); }
|
四、操作符的装饰器模式
4.1 map 操作符原理
map 是 RxJava 中最常用的操作符之一,它将数据流中的每个元素转换为另一种类型。其底层实现是经典的装饰器模式(Decorator Pattern):
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly( new ObservableMap<T, R>(this, mapper)); }
|
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } }
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; @Override public void onNext(T t) { U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value."); } catch (Throwable ex) { fail(ex); return; } downstream.onNext(v); } }
|
装饰器链的构建:
observable .map(mapper1) // → ObservableMap1 (持有 observable, mapper1) .filter(predicate) // → ObservableFilter (持有 ObservableMap1, predicate) .map(mapper2) // → ObservableMap2 (持有 ObservableFilter, mapper2)
|
最终的订阅调用链:
Observer ← MapObserver2 (持有 mapper2) ← FilterObserver (持有 predicate) ← MapObserver1 (持有 mapper1) ← 原始 Observable
|
4.2 flatMap 操作符原理
flatMap 是 map 的升级版——map 是一对一转换,flatMap 是一对多(每个元素变成一个新的 Observable):
Observable.just("user1", "user2") .flatMap(userId -> { return apiService.getUser(userId); }) .subscribe(user -> Log.d("RX", user.getName()));
|
flatMap 的核心挑战是并发——多个内部 Observable 可能同时发射数据,flatMap 通过 merge 合并它们。其内部实现:
@Override public void onNext(T t) { ObservableSource<? extends R> p; try { p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); } catch (Throwable e) { onError(e); return; } if (maxConcurrency != Integer.MAX_VALUE) { if (active.get() == maxConcurrency) { queue.offer(t); return; } } InnerObserver inner = new InnerObserver(t); downstream.onSubscribe(inner); active.incrementAndGet(); p.subscribe(inner); }
|
flatMap vs concatMap vs switchMap:
| 操作符 |
行为 |
适用场景 |
flatMap |
并发合并所有内部 Observable 的数据 |
多个独立请求同时进行 |
concatMap |
按顺序一个一个处理(前一个完成后才订阅下一个) |
需要严格顺序的场景 |
switchMap |
新元素到来时,取消前一个内部 Observable |
搜索建议(只关心最新输入) |
4.3 lift 操作符——操作符的通用基类
大部分操作符都基于 lift() 实现。lift 是 RxJava 最早的核心设计之一:
public final <R> Observable<R> lift(ObservableOperator<? extends R, ? super T> lifter) { ObjectHelper.requireNonNull(lifter, "onLift is null"); return RxJavaPlugins.onAssembly(new ObservableLift<R, T>(this, lifter)); }
|
lift 将一个 ObservableOperator 插入到订阅链中,Operator 包装下游的 Observer,然后将包装后的 Observer 传给上游。这与 map 的原理完全一致,map 就是 lift 的一个特例。
五、线程调度(Scheduler)
5.1 subscribeOn vs observeOn
这是面试中最常被问到的 RxJava 问题。
Observable.just("Hello") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(s -> s + " World") .observeOn(Schedulers.computation()) .filter(s -> s.length() > 5) .subscribe(result -> { Log.d("RX", result); });
|
核心规则:
subscribeOn 指定上游 Observable 启动时所在的线程;多次调用只有第一个生效(后续的被忽略)
observeOn 指定下游 Observer 接收数据的线程;多次调用可以不断切换下游线程
5.2 subscribeOn 实现原理
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; @Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(() -> { source.subscribe(parent); })); } }
|
subscribeOn 的原理很简单:它将订阅上游这个操作调度到了目标线程。因为数据发射是由订阅触发的(冷 Observable),所以整个数据流就从调度线程开始执行。
5.3 observeOn 实现原理
observeOn 比 subscribeOn 更复杂,因为它需要在数据流中插入线程切换点:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { final Observer<? super T> downstream; final Scheduler.Worker worker; SimpleQueue<T> queue; @Override public void onNext(T t) { queue.offer(t); schedule(); } void schedule() { worker.schedule(this); } @Override public void run() { drainNormal(); } void drainNormal() { final SimpleQueue<T> q = queue; final Observer<? super T> a = downstream; for (;;) { T v = q.poll(); if (v == null) break; a.onNext(v); } } }
|
observeOn 使用生产者-消费者队列:上游在工作线程中向队列 put 数据,然后触发调度;下游在目标线程中从队列 take 数据并回调 onNext。队列在这里起到了线程间缓冲和通信的作用。
5.4 Schedulers 的内部实现
static final Scheduler COMPUTATION = RxJavaPlugins.initComputationScheduler( new ComputationTask());
static final Scheduler IO = RxJavaPlugins.initIoScheduler( new IOTask());
|
Android 中的 Scheduler 使用最佳实践:
Observable.create(emitter -> { Response response = apiService.doSomething(); emitter.onNext(response); }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .map(response -> parser.parse(response)) .observeOn(AndroidSchedulers.mainThread()) .subscribe(data -> { textView.setText(data.toString()); });
|
六、背压(Backpressure)与 Flowable
6.1 背压问题的产生
当上游 Observable 发射数据的速度超过下游 Observer 处理的速度时,就会产生背压问题。最典型的场景:上游每秒发射 10000 个数据,下游每秒只能处理 100 个。
Observable 默认没有背压策略——数据会积压在操作符内部的缓冲区中,最终导致 OutOfMemoryError。
6.2 Flowable 的背压策略
RxJava 2.x 引入了 Flowable 类型来解决背压问题:
Flowable 通过 request(n) 实现响应式拉取(Reactive Pull):
Flowable.create(emitter -> { for (int i = 0; i < 1000000; i++) { emitter.onNext(i); } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { s.request(10); } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable t) {} @Override public void onComplete() {} });
|
6.3 BackpressureStrategy 详解
| 策略 |
行为 |
适用场景 |
MISSING |
不缓冲不丢弃,由下游通过 request() 控制 |
下游能够处理所有数据 |
ERROR |
下游处理不过来时抛出 MissingBackpressureException |
不允许丢数据的场景 |
BUFFER |
无界缓冲所有数据(可能导致 OOM) |
数据量可控的场景 |
DROP |
丢弃下游来不及处理的数据 |
状态更新(只关心最新值) |
LATEST |
保留下游来不及处理的最新一个数据 |
类似 DROP,但保留最新值 |
七、实战场景
7.1 网络请求处理
RxTextView.textChanges(searchEditText) .debounce(300, TimeUnit.MILLISECONDS) .filter(text -> text.length() > 1) .distinctUntilChanged() .switchMap(keyword -> { return apiService.search(keyword) .subscribeOn(Schedulers.io()) .retry(2); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe( results -> adapter.setData(results), error -> showError(error) );
|
7.2 多级缓存
Observable.concat( memoryCache.getData(), diskCache.getData(), network.getData() .doOnNext(data -> diskCache.save(data)) ) .firstElement() .toObservable() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(data -> updateUI(data));
|
7.3 合并多个数据源
Observable.zip( apiService.getUserProfile(userId).subscribeOn(Schedulers.io()), apiService.getRecommendations().subscribeOn(Schedulers.io()), (user, recommendations) -> new HomePageData(user, recommendations) ) .observeOn(AndroidSchedulers.mainThread()) .subscribe( homeData -> { displayProfile(homeData.user); displayRecommendations(homeData.recommendations); }, error -> showError(error) );
|
八、内存泄漏与最佳实践
8.1 RxJava 导致内存泄漏的常见原因
Observable.interval(1, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(value -> { textView.setText("" + value); });
private CompositeDisposable disposables = new CompositeDisposable();
@Override protected void onCreate(Bundle savedInstanceState) { disposables.add( Observable.interval(1, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(value -> textView.setText("" + value)) ); }
@Override protected void onDestroy() { disposables.clear(); super.onDestroy(); }
|
8.2 使用 AutoDispose / RxLifecycle
Observable.interval(1, TimeUnit.SECONDS) .as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this))) .subscribe(...);
Observable.interval(1, TimeUnit.SECONDS) .compose(RxLifecycle.bindUntilEvent(lifecycle, ActivityEvent.DESTROY)) .subscribe(...);
|
九、常见面试题
Q1: subscribeOn 和 observeOn 的区别是什么?为什么多次 subscribeOn 只有第一次生效?
A: subscribeOn 影响的是上游 Observable 的订阅线程,即 source.subscribe() 这个操作在哪个线程执行——这意味着数据流的源头在哪个线程启动。observeOn 影响的是下游 Observer 的接收线程,即 onNext/onError/onComplete 在哪个线程回调。多次 subscribeOn 只有第一次生效的原因是:数据流是从上游往下游传播的,第二个 subscribeOn 只会影响它上游之前的代码,但此时第一个 subscribeOn 已经决定了最初 source.subscribe() 的执行线程。简单的记忆法则:subscribeOn 管”从哪里开始”,observeOn 管”接下来去哪”。
Q2: map 和 flatMap 的实现原理有何不同?什么情况下该用哪个?
A: map 是一对一的同步转换。它在当前线程中直接执行 mapper,将结果立即传给下游——没有创建新的 Observable,没有线程切换。flatMap 是一对多的异步转换。它将每个上游数据映射为一个 Observable,然后通过 merge 机制合并这些内部 Observable 的数据。flatMap 的内部是并发的(默认 maxConcurrency 为 Integer.MAX_VALUE),多个内部 Observable 可以同时发射数据。使用场景:map 用于简单的类型转换(如 JSON → Model);flatMap 用于一个输入触发异步操作(如 userId → 网络请求获取用户详情)。需要注意:如果 mapper 返回的是 Observable,使用 flatMap;如果返回值不会异步,用 map 更高效(减少 Observable 创建开销)。
Q3: Observable 和 Flowable 的区别是什么?什么时候应该使用 Flowable?
A: Observable 不支持背压(Backpressure),适用于数据流速度可控的场景(如 UI 事件、网络请求响应等少量数据)。Flowable 支持背压,适用于上游速度快于下游的场景(如传感器数据、文件读取、数据库查询返回大量数据)。Flowable 通过 Subscriber 的 request(n) 机制实现响应式拉取,下游声明自己能处理多少个数据,上游根据此信号控制发射速率。在 Android 开发中,大多数场景可以用 Observable(因为 Android 中数据流通常数据量小、发射间隔长),但当处理 1000+ 条数据库查询结果或文件流时,应使用 Flowable 并设置合适的背压策略。
Q4: RxJava 中的 Disposable 是什么?为什么需要它?如何正确管理 Disposable?
A: Disposable 是 RxJava 中用于取消订阅的接口。每个 subscribe() 调用都返回一个 Disposable。它的作用是:(1) 停止 Observer 接收数据;(2) 释放上游 Observable 持有的资源(如取消网络请求、关闭文件流、停止定时器)。不及时 dispose 会导致内存泄漏——Observable 持有 Activity/Fragment 的引用,即使 Activity 已经 finish 了,Observable 仍在发射数据。正确的管理方式:(1) 使用 CompositeDisposable 聚合所有 Disposable,在 onDestroy() 中调用 compositeDisposable.clear();(2) 使用 RxLifecycle 或 AutoDispose 框架自动管理;(3) 使用 takeUntil() 操作符(如 takeUntil(lifecycleEvent))在特定条件下自动取消订阅。
Q5: RxJava 的操作符链是如何构建的?它使用了什么设计模式?
A: RxJava 的操作符链基于装饰器模式(Decorator Pattern)。每个操作符(map、filter、flatMap)都会创建一个新的 Observable 子类对象,该对象包装(持有)上游的 Observable。当调用 subscribe() 时,订阅从下游向上游反向传播:最终的 Observer 被包装为 FilterObserver,传给 ObservableMap → FilterObserver 包装为 MapObserver,传给 ObservableFilter → MapObserver 包装为原始 Observer,传给原始 Observable → 原始 Observable 开始发射数据 → 数据沿 Observer 链依次经过 MapObserver(执行 mapper)→ FilterObserver(执行 predicate)→ 最终 Observer(接收结果)。这种设计的好处是每个操作符职责单一、可组合,且所有操作符的复用性高。
Q6: switchMap、concatMap 和 flatMap 的区别和使用场景?
A: 三者都执行”一个上游数据映射为一个 Observable”的操作,区别在于对多个内部 Observable 的处理方式。flatMap 并发执行所有内部 Observable——所有内部 Observable 同时启动,谁先返回数据谁先发射,顺序不保证。适用于多个独立的、没有顺序要求的并发请求。concatMap 串行执行——前一个内部 Observable 完成后,才启动下一个。适用于需要严格顺序的场景,如先登录再获取用户数据。switchMap 新来切换——当新的上游数据到来时,如果上一个内部 Observable 还未完成,直接取消它,只处理最新的。适用于搜索建议(用户快速输入时,取消旧的搜索请求,只保留最新的)。性能方面:flatMap 最快(并发),switchMap 其次(取消旧请求节省资源),concatMap 最慢(串行等待)。
参考文档: