一、响应式编程的核心思想
响应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。在命令式编程中,表达式 a = b + c 只在赋值那一刻计算一次;而在响应式编程中,当 b 或 c 发生变化时,a 会自动更新。RxJava 将这一思想带入 JVM 生态,用 Observable(可观察对象) 表示数据流,用 Observer(观察者) 订阅并响应这些数据流。
RxJava 的四个核心角色:
- ObservableSource:数据的生产者,负责发射数据(onNext)、发射完成信号(onComplete)或错误信号(onError)。
- Observer:数据的消费者,通过
subscribe(Observer)与上游建立订阅关系。 - Disposable:用于取消订阅,防止内存泄漏。调用
dispose()后,上游将停止发射数据。 - Operator(操作符):在上下游之间对数据流进行变换、过滤、聚合等操作。
RxJava 对 Reactive Streams 规范的实现体现在 Flowable 中,其核心接口定义在 org.reactivestreams 包的四个接口中:Publisher<T>、Subscriber<T>、Subscription、Processor<T,R>。这套规范已被纳入 JDK 9 的 java.util.concurrent.Flow。
// 最简单的订阅模型 |
这条链路的执行流程是:just 发射两个字符串 → map 转换为大写 → filter 过滤 → 订阅者打印。每一层操作符都创建一个新的 Observable,形成装饰器模式的嵌套结构。
二、Observable 族的四种类型
RxJava 2.x/3.x 提供了五种响应式类型,它们各有适用场景:
2.1 Observable
发射 0 到 N 个数据,以 onComplete 或 onError 终止。不支持背压,适用于 GUI 事件、少量数据流(最多几千个元素)。典型场景:按钮点击事件、单个网络请求响应。
2.2 Flowable
发射 0 到 N 个数据,支持背压。实现了 Reactive Streams 规范,通过 request(n) 让下游控制上游的发射速率。适用于大量数据流:数据库查询结果、文件读取、传感器事件流。额外开销比 Observable 大(需要维护请求计数和队列)。
2.3 Single
发射恰好 1 个数据或 1 个错误。没有 onComplete 回调——onSuccess 替代了 onNext+onComplete。适用于”请求-响应”模式:网络请求、数据库查询。内部实现比 Observable 简单,不需要处理多元素和完成信号的关系。
Single.just("Hello") |
2.4 Completable
只关心操作是否完成,不发射数据。只有 onComplete 和 onError 两种终止信号。适用于:写入数据库、文件操作、发送分析事件——只关心成功或失败,不需要返回值。
Completable.fromAction(() -> database.deleteAll()) |
2.5 Maybe
结合了 Single 和 Completable:可能发射 1 个数据,也可能不发射任何数据就完成,或发射一个错误。适用于:从缓存中查找数据——可能找到(onSuccess)、可能找不到(onComplete)、可能出错(onError)。
Maybe.fromCallable(() -> cache.get("key")) |
选择指南:
| 类型 | 数据量 | 完成信号 | 背压 | 典型场景 |
|---|---|---|---|---|
| Observable | 0..N | onComplete | 无 | UI 事件、单个 API 调用 |
| Flowable | 0..N | onComplete | 有 | 大数据流、数据库查询 |
| Single | 1 | onSuccess | 无 | 网络请求 |
| Completable | 0 | onComplete | 无 | 写操作 |
| Maybe | 0..1 | onComplete/onSuccess | 无 | 缓存查询 |
三、Observable/Observer 订阅模型深入
在 RxJava 2.x 源码中,Observable 是一个抽象类,内部持有 ObservableSource<T> 接口引用。ObservableCreate 是 Observable 最基础的实现类,构造函数接收 ObservableOnSubscribe<T>,后者定义了 subscribe(ObservableEmitter<T> emitter) 方法。
// RxJava 2.x 源码:io.reactivex.ObservableCreate |
注意这里的 subscribeActual 方法。Observable 的 subscribe(Observer) 方法最终会调用子类的 subscribeActual。这是模板方法模式——父类 Observable 定义 subscribe 的通用逻辑(如空检查、线程调度),子类实现 subscribeActual 完成具体的发射逻辑。
CreateEmitter 内部持有 Observer 的引用,负责将上游的 onNext/onError/onComplete 转发给下游,同时检查 isDisposed() 状态,一旦取消订阅就停止发射。这实现了背压(Backpressure)的最基本形式——通过 Disposable 终止数据流。
// CreateEmitter 的部分源码 |
DisposableHelper 是一个工具类,使用 CAS 原子操作管理 Disposable 的状态。它将 Disposable 引用封装在 AtomicReference<Disposable> 中:
// io.reactivex.internal.disposables.DisposableHelper |
这个枚举实现非常巧妙:使用单例 DISPOSED 作为已取消状态的标记,CAS 保证线程安全。一旦 getAndSet(DISPOSED) 成功,后续所有 isDisposed() 检查都返回 true,且 dispose() 只执行一次。
四、操作符链与装饰器模式
操作符是 RxJava 的核心竞争力。常用的变换类操作符(map、flatMap、concatMap、switchMap)、过滤类操作符(filter、take、skip、distinct)、组合类操作符(zip、merge、combineLatest)都以 Observable 的子类实现。
以 map 操作符为例,RxJava 2.x 的 ObservableMap 类源码:
// io.reactivex.internal.operators.observable.ObservableMap |
典型的操作符链 Observable.just(1).map(x -> x * 2).filter(x -> x > 0) 构建出的对象图:
ObservableJust → ObservableMap → ObservableFilter |
每个操作符都持有上游的引用(upstream/source),订阅时自下而上逐层包装 Observer,数据发射时再自上而下逐层传递。这就是 assembly-time(装配期) 与 subscription-time(订阅期) 的区分:
- 装配期:操作符链中的对象被创建,形成嵌套引用。此时数据尚未流动。
- 订阅期:
subscribe()被调用,从下游向上游逐层调用subscribe,每一层创建对应的 Observer 包装,最终触发顶层的subscribeActual。
这一设计的精妙之处在于:数据流是惰性的(lazy),只有在订阅时才真正启动;而且每一层操作符之间是完全解耦的,可以自由组合。
4.1 flatMap 与内部 Observable 管理
flatMap 的操作更加复杂。它将每个上游元素映射为一个 Observable,然后将这些 Observable 合并(merge) 为一个输出流。注意 flatMap 不保证事件顺序,因为多个内部 Observable 可能交错发射。如果需要顺序保证,应使用 concatMap。
// flatMap 合并内部 Observable 的核心逻辑(简化) |
concatMap 的区别在于它维护一个内部队列,前一个 Observable 完成后再订阅下一个,保证顺序。switchMap 则在新 Observable 到来时取消(dispose)前一个内部 Observable 的订阅——适用于搜索框输入联想场景(只需最新搜索词的结果)。
4.2 操作符融合(Operator Fusion)
RxJava 2.x 引入了操作符融合机制,这是一项重要的性能优化。BasicFuseableObserver 中的 “Fuseable” 就是这个概念。当相邻的两个操作符满足条件时,可以通过 QueueDisposable / QueueSubscription 接口直接传递数据,跳过 Observer 包装的开销。
融合分两种模式:
- 同步融合(SYNC):上游
Observable.just()直接提供一个已计算完成的数据队列,下游map的 Observer 无需通过 onNext 回调,而是直接通过poll()获取数据并变换。 - 异步融合(ASYNC):上游是异步数据源(如
Observable.range()),下游操作符通过requestFusion()协商进入融合模式,用poll()+drain()的拉取模式替代逐次 onNext 推送。
// BasicFuseableObserver 中的融合协商 |
融合情况下,map 不再用 onNext 逐个接收数据,而是从 queue.poll() 批量拉取。这显著减少了方法调用栈深度和 Observer 链上的逐层转发开销。
五、线程调度与 Scheduler 机制
RxJava 的线程切换通过 subscribeOn 和 observeOn 完成。这是面试中的高频考点。
subscribeOn:指定上游 Observable 的工作线程,即 subscribeActual 中发射数据的线程。如果在链中多次调用 subscribeOn,只有第一个(离源头最近的)生效。原因是下游调用 subscribeOn 时,上游已经在上一个 subscribeOn 指定的线程上被订阅了。
observeOn:指定下游 Observer 接收数据的线程。可以在链中多次调用,每次调用后的下游都在新线程上工作。
Scheduler 的实现核心是 Scheduler.Worker,它是一个可调度的执行单元。RxJava 内置几种 Scheduler:
| Scheduler | 底层实现 | 适用场景 |
|---|---|---|
Schedulers.io() |
无上限的 CachedThreadPool | 网络请求、文件 I/O |
Schedulers.computation() |
固定大小(CPU 核数)的线程池 | 计算密集型操作 |
Schedulers.newThread() |
每次新建线程 | 需要完全隔离的任务 |
AndroidSchedulers.mainThread() |
Handler(Looper.getMainLooper()) | UI 更新 |
Schedulers.single() |
单线程 | 需要顺序执行的任务 |
Schedulers.trampoline() |
当前线程的队列 | 递归调度避免栈溢出 |
5.1 IoScheduler 内部实现
// io.reactivex.internal.schedulers.IoScheduler |
关键设计:Schedulers.io() 虽然”无上限”,但并非每次创建新线程。它使用线程池缓存,核心线程数为 0,最大线程数为 Integer.MAX_VALUE,线程空闲 60 秒后回收。这使得 IO 调度器能应对大量并发 IO 请求,同时空闲时不会持有不必要的线程。
5.2 AndroidSchedulers.mainThread() 内部实现
AndroidSchedulers.mainThread() 的实现原理非常简单:
// rxandroid: AndroidSchedulers.java |
HandlerScheduler 通过 Handler.sendMessageDelayed() 将任务投递到主线程的 MessageQueue。ScheduledRunnable 实现了 Disposable,可以通过 handler.removeCallbacks() 取消尚未执行的任务。message.obj = this 的设置用于在取消时找到对应的 Message(通过 handler.removeCallbacksAndMessages(this))。
源码路径:rxandroid/src/main/java/rx/android/schedulers/HandlerScheduler.java
5.3 subscribeOn 和 observeOn 的线程切换原理
subscribeOn 的实现 —— 以 ObservableSubscribeOn 为例:
// io.reactivex.internal.operators.observable.ObservableSubscribeOn |
observeOn 的实现 —— 以 ObservableObserveOn 为例:
// io.reactivex.internal.operators.observable.ObservableObserveOn |
observeOn 的核心是一个生产者-消费者队列:上游的 onNext 将数据放入 SpscArrayQueue(单生产者单消费者无锁队列),然后在目标 Scheduler 上调度一个 Worker 消费队列,消费时逐条调用下游 Observer 的 onNext。
六、背压(Backpressure)策略
当上游发射数据的速度超过下游消费的速度时,就会产生背压。RxJava 2.x 将 Observable 拆分为两类:
- Observable:不支持背压(适合少量数据、GUI 事件)
- Flowable:支持背压(适合大量数据、流式处理)
Flowable 的背压策略通过 BackpressureStrategy 枚举定义(源码路径 io.reactivex.BackpressureStrategy):
- MISSING:不做任何处理,下游需要自行通过
request(n)控制速率。数据可能溢出导致MissingBackpressureException。 - ERROR:当数据堆积时直接抛出
MissingBackpressureException。 - BUFFER:无界缓冲,所有数据先入队列。可能导致 OOM。
- DROP:丢弃下游来不及处理的数据。
- LATEST:仅保留最新的一个数据,旧的被丢弃。
Flowable 内部的背压协议基于 Reactive Streams 规范的 Subscription.request(n) 机制:
// 下游告诉上游:我准备好了,给我 n 个数据 |
在 FlowableCreate 中,FlowableEmitter 实现了 request(n) 的感知:
// FlowableCreate.java - 关键字段 |
io.reactivex.internal.util.BackpressureHelper 包含用于管理请求计数的工具方法,使用 CAS(Compare-And-Swap)保证原子性:
public static long produced(AtomicLong requested, long n) { |
Long.MAX_VALUE 的语义是”下游接受无限数据”(如调用 request(Long.MAX_VALUE)),此时 produced() 直接返回,无任何限制。这是 Flowable 内建的”退路”——如果某些操作符不支持背压,可以直接请求最大值来退化成无背压行为。
6.1 Flowable 内部队列
FlowableCreate 内部根据不同 BackpressureStrategy 使用不同的队列实现:
// FlowableCreate 内部 |
七、Disposable 与内存泄漏防护
任何订阅如果不取消,都可能导致 Activity/Fragment 无法被 GC 回收。RxJava 提供了三级防御机制:
第一级:手动管理 Disposable
private CompositeDisposable compositeDisposable = new CompositeDisposable(); |
CompositeDisposable 内部使用 OpenHashSet<Disposable> 存储,并提供原子性的 add/delete/clear 操作。clear() 不仅清空集合,还会对每个 Disposable 调用 dispose()。
第二级:RxLifecycle / AutoDispose
AutoDispose 通过拦截 subscribe 调用,自动在生命周期结束时取消订阅。其核心是在 Observer 和上游之间插入一个 AutoDisposingObserver,监听 LifecycleOwner 的 ON_DESTROY 事件。
AutoDispose 使用 as() 操作符和 ScopeProvider 模式:
// 使用 AutoDispose |
AndroidLifecycleScopeProvider 监听 Lifecycle 事件,在 ON_DESTROY 时触发 dispose 信号。具体实现是在 LiveDataReactiveStreams 中将 Lifecycle 的 ON_DESTROY 事件映射为 emit(Unit) 到 Maybe<Lifecycle.Event>,然后 AutoDisposingObserver 在收到事件时 dispose 上游。
第三级:UndeliverableException 处理
当 onError 发生后订阅已终止,后续的事件无法投递,会抛出 UndeliverableException。应通过 RxJavaPlugins.setErrorHandler() 设置全局异常处理器:
RxJavaPlugins.setErrorHandler(e -> { |
7.1 dispose 的原子性传播
dispose 的传播方向是从下游向上游。DisposableHelper.dispose() 首先断开下游的 Disposable 引用(通过设置 CAS 为 DISPOSED),然后调用上游的 dispose。这是一个原子操作——一旦 CAS 成功,后续所有 onNext/onError/onComplete 调用都会在 isDisposed() 检查处被拦截。
八、RxJava 2.x 与 3.x 的差异
RxJava 3.x 基于 Java 8,引入了 java.util.concurrent.Flow 兼容性。关键变化包括:
- 所有操作符、类使用 Java 8 方法引用和 lambda
Completable、Single、Maybe的操作符增加Flowable可直接转换为Flow.Publisher- 移除了
Observable.flatMap()过时方法,统一到flatMap的规范实现 - 引入了
@CheckReturnValue注解,所有返回 Disposable/Subscription 的操作符方法都被标记,防止丢失订阅引用
生产环境建议使用 3.x,2.x 已进入维护模式。
8.1 RxJava 与 Kotlin Coroutines 的对比
随着 Kotlin 在 Android 开发中的普及,Coroutines 逐渐成为 RxJava 的主要替代方案。
| 维度 | RxJava | Kotlin Coroutines |
|---|---|---|
| 编程模型 | 响应式流(Observable/Flowable) | 结构化并发(suspend/Flow) |
| 线程切换 | subscribeOn/observeOn | withContext/flowOn |
| 背压 | Flowable 原生支持 | Flow 原生支持(Channel.BUFFERED 等) |
| 错误处理 | onError 回调链 | try-catch / catch 操作符 |
| 操作符丰富度 | 极丰富(200+ 操作符) | 基础齐全(正在快速增长) |
| 学习曲线 | 陡峭 | 相对平缓 |
| 调试体验 | 困难(操作符链深,堆栈难读) | 相对容易(suspend 堆栈可读性好) |
| 生命周期绑定 | 需额外库(AutoDispose) | 天然与 lifecycleScope 集成 |
| Java 互操作 | 原生支持 | 需要少量适配 |
| 性能 | 有装饰器链开销 | 有协程切换开销,通常更轻量 |
RxJava 的优势在于操作符生态极其丰富,对于复杂的流转换(如 debounce + switchMap + distinctUntilChanged 的搜索框场景)有成熟的范式。Coroutines 的优势在于与 Kotlin 语言深度集成,代码更接近同步写法,调试更简单。
九、RxJavaPlugins 全局钩子
RxJavaPlugins 是 RxJava 提供的全局 Hook 机制,主要有三个用途:
- 全局错误处理:
setErrorHandler() - 调度器替换:
setInitIoSchedulerHandler()/setInitComputationSchedulerHandler()等在 Scheduler 创建前注入自定义实例 - Observable 创建拦截:
setOnObservableAssembly()对所有 Observalbe 装配过程注入监控
// 全局监控所有 Observable 的创建(调试/性能分析用) |
这是测试和生产环境中实现统一日志、性能追踪的入口。
十、面试常问题目
Q1: subscribeOn 和 observeOn 的区别?多次调用 subscribeOn 会怎样?
subscribeOn 影响上游数据发射所在的线程,只第一个生效(最近的源头)。observeOn 影响下游数据接收所在的线程,可以多次调用以切换不同阶段的线程。例如:.subscribeOn(Schedulers.io()).map(...).observeOn(AndroidSchedulers.mainThread()).subscribe(...) 表示在 IO 线程发数据、做 map 变换,在主线程接收结果。
Q2: map 和 flatMap 的区别是什么?
map 是一对一变换,将 T 类型转换为 U 类型;flatMap 是一对多变换,将 T 映射为一个 Observable<U>,然后将所有内部 Observable 合并(merge)输出。flatMap 不能保证顺序,concatMap 保证顺序但有性能开销(需要等前一个内部 Observable 完成),switchMap 在新事件到来时取消旧的内部 Observable。
Q3: RxJava 如何防止内存泄漏?必须手动 dispose 吗?
RxJava 提供 CompositeDisposable 进行批量取消;推荐使用 AutoDispose 或 RxLifecycle 将订阅绑定到生命周期;也可使用 takeUntil() 操作符配合生命周期事件自动取消。核心原则:每个 subscribe 返回的 Disposable 必须在适当生命周期方法中 dispose。
Q4: Observable 和 Flowable 的核心区别是什么?
Observable 不支持背压,适用于少量、低频事件(GUI 点击、单个网络请求结果)。Flowable 支持背压,实现了 Reactive Streams 规范,通过 request(n) 机制让下游控制上游发射速率,适用于大量数据流(数据库查询结果、传感器事件流)。Flowable 额外开销更大(需维护原子计数器和队列),仅在需要背压时使用。
Q5: 操作符融合(Operator Fusion)解决了什么问题?
操作符融合解决了操作符链中逐层包装 Observer 带来的性能开销。当相邻的两个操作符满足条件时(如 Observable.just() + map),它们可以通过 QueueDisposable 接口直接推送/拉取数据,跳过 Observer 的一次包装和方法调用。这是 RxJava 2.x 的重要性能优化,在大量数据的场景(如 range().map().filter())下性能提升可达 30-50%。
参考源码路径:
- RxJava 核心:
https://github.com/ReactiveX/RxJava/tree/2.x/src/main/java/io/reactivex - Observable 创建:
io.reactivex.internal.operators.observable.ObservableCreate - Map 操作符:
io.reactivex.internal.operators.observable.ObservableMap - 背压策略:
io.reactivex.BackpressureStrategy - Flowable 创建:
io.reactivex.internal.operators.flowable.FlowableCreate - RxAndroid Scheduler:
rxandroid/src/main/java/rx/android/schedulers/AndroidSchedulers.java - Reactive Streams 规范:
http://www.reactive-streams.org/ - AutoDispose:
https://github.com/uber/AutoDispose - Operator Fusion:
io.reactivex.internal.fuseable.QueueDisposable







