一、响应式编程的核心思想
响应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。在命令式编程中,表达式 a = b + c 只在赋值那一刻计算一次;而在响应式编程中,当 b 或 c 发生变化时,a 会自动更新。RxJava 将这一思想带入 JVM 生态,用 Observable(可观察对象) 表示数据流,用 Observer(观察者) 订阅并响应这些数据流。
RxJava 的四个核心角色:
- ObservableSource:数据的生产者,负责发射数据(onNext)、发射完成信号(onComplete)或错误信号(onError)。
- Observer:数据的消费者,通过
subscribe(Observer)与上游建立订阅关系。 - Disposable:用于取消订阅,防止内存泄漏。调用
dispose()后,上游将停止发射数据。 - Operator(操作符):在上下游之间对数据流进行变换、过滤、聚合等操作。
RxJava 对 Reactive Streams 规范的实现体现在 Flowable 中,其核心接口定义在 org.reactivestreams 包的四个接口中:Publisher<T>、Subscriber<T>、Subscription、Processor<T,R>。这套规范已被纳入 JDK 9 的 java.util.concurrent.Flow。
// 最简单的订阅模型 |
这条链路的执行流程是:just 发射两个字符串 → map 转换为大写 → filter 过滤 → 订阅者打印。每一层操作符都创建一个新的 Observable,形成装饰器模式的嵌套结构。
二、Observable/Observer 订阅模型深入
在 RxJava 2.x 源码中,Observable 是一个抽象类,内部持有 ObservableSource<T> 接口引用。ObservableCreate 是 Observable 最基础的实现类,构造函数接收 ObservableOnSubscribe<T>,后者定义了 subscribe(ObservableEmitter<T> emitter) 方法。
// RxJava 2.x 源码:io.reactivex.ObservableCreate |
注意这里的 subscribeActual 方法。Observable 的 subscribe(Observer) 方法最终会调用子类的 subscribeActual。这是模板方法模式——父类 Observable 定义 subscribe 的通用逻辑(如空检查、线程调度),子类实现 subscribeActual 完成具体的发射逻辑。
CreateEmitter 内部持有 Observer 的引用,负责将上游的 onNext/onError/onComplete 转发给下游,同时检查 isDisposed() 状态,一旦取消订阅就停止发射。这实现了背压(Backpressure)的最基本形式——通过 Disposable 终止数据流。
// CreateEmitter 的部分源码 |
三、操作符链与装饰器模式
操作符是 RxJava 的核心竞争力。常用的变换类操作符(map、flatMap、concatMap、switchMap)、过滤类操作符(filter、take、skip、distinct)、组合类操作符(zip、merge、combineLatest)都以 Observable 的子类实现。
以 map 操作符为例,RxJava 2.x 的 ObservableMap 类源码:
// io.reactivex.internal.operators.observable.ObservableMap |
典型的操作符链 Observable.just(1).map(x -> x * 2).filter(x -> x > 0) 构建出的对象图:
ObservableJust → ObservableMap → ObservableFilter |
每个操作符都持有上游的引用(upstream/source),订阅时自下而上逐层包装 Observer,数据发射时再自上而下逐层传递。这就是 assembly-time(装配期) 与 subscription-time(订阅期) 的区分:
- 装配期:操作符链中的对象被创建,形成嵌套引用。此时数据尚未流动。
- 订阅期:
subscribe()被调用,从下游向上游逐层调用subscribe,每一层创建对应的 Observer 包装,最终触发顶层的subscribeActual。
这一设计的精妙之处在于:数据流是惰性的(lazy),只有在订阅时才真正启动;而且每一层操作符之间是完全解耦的,可以自由组合。
flatMap 的操作更加复杂。它将每个上游元素映射为一个 Observable,然后将这些 Observable 合并(merge) 为一个输出流。注意 flatMap 不保证事件顺序,因为多个内部 Observable 可能交错发射。如果需要顺序保证,应使用 concatMap。
// flatMap 合并内部 Observable 的核心逻辑(简化) |
四、线程调度与 Scheduler 机制
RxJava 的线程切换通过 subscribeOn 和 observeOn 完成。这是面试中的高频考点。
subscribeOn:指定上游 Observable 的工作线程,即 subscribeActual 中发射数据的线程。如果在链中多次调用 subscribeOn,只有第一个(离源头最近的)生效。原因是下游调用 subscribeOn 时,上游已经在上一个 subscribeOn 指定的线程上被订阅了。
observeOn:指定下游 Observer 接收数据的线程。可以在链中多次调用,每次调用后的下游都在新线程上工作。
Scheduler 的实现核心是 Scheduler.Worker,它是一个可调度的执行单元。RxJava 内置几种 Scheduler:
| Scheduler | 底层实现 | 适用场景 |
|---|---|---|
Schedulers.io() |
无上限的 CachedThreadPool | 网络请求、文件 I/O |
Schedulers.computation() |
固定大小(CPU 核数)的线程池 | 计算密集型操作 |
Schedulers.newThread() |
每次新建线程 | 需要完全隔离的任务 |
AndroidSchedulers.mainThread() |
Handler(Looper.getMainLooper()) | UI 更新 |
Schedulers.single() |
单线程 | 需要顺序执行的任务 |
AndroidSchedulers.mainThread() 的实现原理非常简单:
// rxandroid: AndroidSchedulers.java |
源码路径:rxandroid/src/main/java/rx/android/schedulers/HandlerScheduler.java
五、背压(Backpressure)策略
当上游发射数据的速度超过下游消费的速度时,就会产生背压。RxJava 2.x 将 Observable 拆分为两类:
- Observable:不支持背压(适合少量数据、GUI 事件)
- Flowable:支持背压(适合大量数据、流式处理)
Flowable 的背压策略通过 BackpressureStrategy 枚举定义(源码路径 io.reactivex.BackpressureStrategy):
- MISSING:不做任何处理,下游需要自行通过
request(n)控制速率。数据可能溢出导致MissingBackpressureException。 - ERROR:当数据堆积时直接抛出
MissingBackpressureException。 - BUFFER:无界缓冲,所有数据先入队列。可能导致 OOM。
- DROP:丢弃下游来不及处理的数据。
- LATEST:仅保留最新的一个数据,旧的被丢弃。
Flowable 内部的背压协议基于 Reactive Streams 规范的 Subscription.request(n) 机制:
// 下游告诉上游:我准备好了,给我 n 个数据 |
在 FlowableCreate 中,FlowableEmitter 实现了 request(n) 的感知:
// FlowableCreate.java - 关键字段 |
io.reactivex.internal.util.BackpressureHelper 包含用于管理请求计数的工具方法,使用 CAS(Compare-And-Swap)保证原子性。
六、Disposable 与内存泄漏防护
任何订阅如果不取消,都可能导致 Activity/Fragment 无法被 GC 回收。RxJava 提供了三级防御机制:
第一级:手动管理 Disposable
private CompositeDisposable compositeDisposable = new CompositeDisposable(); |
第二级:RxLifecycle / AutoDispose
AutoDispose 通过拦截 subscribe 调用,自动在生命周期结束时取消订阅。其核心是在 Observer 和上游之间插入一个 AutoDisposingObserver,监听 LifecycleOwner 的 ON_DESTROY 事件。
第三级:UndeliverableException 处理
当 onError 发生后订阅已终止,后续的事件无法投递,会抛出 UndeliverableException。应通过 RxJavaPlugins.setErrorHandler() 设置全局异常处理器:
RxJavaPlugins.setErrorHandler(e -> { |
七、RxJava 2.x 与 3.x 的差异
RxJava 3.x 基于 Java 8,引入了 java.util.concurrent.Flow 兼容性。关键变化包括:
- 所有操作符、类使用 Java 8 方法引用和 lambda
Completable、Single、Maybe的操作符增加Flowable可直接转换为Flow.Publisher- 移除了
Observable.flatMap()过时方法,统一到flatMap的规范实现
生产环境建议使用 3.x,2.x 已进入维护模式。
八、面试常问题目
Q1: subscribeOn 和 observeOn 的区别?多次调用 subscribeOn 会怎样?
subscribeOn 影响上游数据发射所在的线程,只第一个生效(最近的源头)。observeOn 影响下游数据接收所在的线程,可以多次调用以切换不同阶段的线程。例如:.subscribeOn(Schedulers.io()).map(...).observeOn(AndroidSchedulers.mainThread()).subscribe(...) 表示在 IO 线程发数据、做 map 变换,在主线程接收结果。
Q2: map 和 flatMap 的区别是什么?
map 是一对一变换,将 T 类型转换为 U 类型;flatMap 是一对多变换,将 T 映射为一个 Observable<U>,然后将所有内部 Observable 合并(merge)输出。flatMap 不能保证顺序,concatMap 保证顺序但有性能开销,switchMap 在新事件到来时取消旧的内部 Observable。
Q3: RxJava 如何防止内存泄漏?必须手动 dispose 吗?
RxJava 提供 CompositeDisposable 进行批量取消;推荐使用 AutoDispose 或 RxLifecycle 将订阅绑定到生命周期;也可使用 takeUntil() 操作符配合生命周期事件自动取消。核心原则:每个 subscribe 返回的 Disposable 必须在适当生命周期方法中 dispose。
Q4: Observable 和 Flowable 的核心区别是什么?
Observable 不支持背压,适用于少量、低频事件(GUI 点击、单个网络请求结果)。Flowable 支持背压,实现了 Reactive Streams 规范,通过 request(n) 机制让下游控制上游发射速率,适用于大量数据流(数据库查询结果、传感器事件流)。Flowable 额外开销更大,仅在需要背压时使用。
参考源码路径:
- RxJava 核心:
https://github.com/ReactiveX/RxJava/tree/2.x/src/main/java/io/reactivex - Observable 创建:
io.reactivex.internal.operators.observable.ObservableCreate - Map 操作符:
io.reactivex.internal.operators.observable.ObservableMap - 背压策略:
io.reactivex.BackpressureStrategy - Flowable 创建:
io.reactivex.internal.operators.flowable.FlowableCreate - RxAndroid Scheduler:
rxandroid/src/main/java/rx/android/schedulers/AndroidSchedulers.java - Reactive Streams 规范:
http://www.reactive-streams.org/







