目录
  1. 1. 一、原子操作(Atomic Operations)
    1. 1.1. 1.1 基本原理
    2. 1.2. 1.2 在 Binder 驱动中的应用
  2. 2. 二、自旋锁(Spinlock)
    1. 2.1. 2.1 基本原理
    2. 2.2. 2.2 Binder 驱动中的 Spinlock 分层
    3. 2.3. 2.3 Spinlock 在用户空间的使用
  3. 3. 三、互斥锁(Mutex)
    1. 3.1. 3.1 基本原理
    2. 3.2. 3.2 Binder 驱动中的 Mutex
  4. 4. 四、信号量(Semaphore)
  5. 5. 五、条件变量(Condition Variable)
  6. 6. 六、屏障(Barrier)
  7. 7. 七、RCU(Read-Copy-Update)
    1. 7.1. 7.1 基本原理
    2. 7.2. 7.2 Binder 驱动中的 RCU
  8. 8. 八、Futex(Fast Userspace Mutex)
    1. 8.1. 8.1 基本原理
      1. 8.1.1. 8.2 Futex 的等待唤醒协议
    2. 8.2. 8.3 ART 中 Object.wait/notify 的 Futex 实现
      1. 8.2.1. 8.4 Thin Lock 到 Fat Lock 的升级
  9. 9. 九、Binder 驱动的锁层级设计
  10. 10. 十、Android 特有的同步方案
    1. 10.1. 10.1 Binder oneway
    2. 10.2. 10.2 Handler 同步机制
  11. 11. 十一、核心面试题
【深入内核篇】经典同步机制

并发和同步是操作系统内核设计的核心问题,也是 Android 系统每一个层次——从 Binder 驱动到 ART 虚拟机——都不可回避的主题。本文以 Android/Linux 为背景,深入分析 spinlock、mutex、RCU、atomic 操作、futex、信号量(semaphore)和条件变量(condition variable)这七种经典同步机制,并结合 Binder 驱动和 ART 中的实际应用进行剖析。

一、原子操作(Atomic Operations)

1.1 基本原理

原子操作是最底层的同步原语,由 CPU 指令(如 x86 的 LOCK CMPXCHG,ARM 的 LDREX/STREX)保证操作的不可分割性。原子操作是所有高级同步机制的基础——mutex、spinlock、RCU 的实现都依赖原子 CAS(Compare-And-Swap)操作。

// include/linux/atomic.h 和 arch/arm64/include/asm/atomic.h
// atomic_t 基于 int,atomic64_t 基于 long long

static inline void atomic_add(int i, atomic_t *v)
{
// ARM64 实现(使用 LSE 扩展如果可用)
asm volatile("stadd %w[i], %[v]\n"
: [v] "+Q" (v->counter)
: [i] "r" (i));
}

static inline int atomic_cmpxchg(atomic_t *v, int old, int new)
{
// CAS 操作:如果 v->counter == old,则设为 new
int ret;
asm volatile("cas %w[ret], %w[new], %[v]\n"
: [ret] "=r" (ret), [v] "+Q" (v->counter)
: [new] "r" (new)
: "memory");
return ret;
}

ARM64 的 LSE(Large System Extensions):如果 CPU 支持(ARMv8.1+),使用 LSE 的原子指令(如 stadd、cas)替代 LDREX/STREX 的 load-linked/store-conditional 循环。LSE 指令在单芯片上完成原子操作,性能更高且避免了 LL/SC 的 ABA 问题。

1.2 在 Binder 驱动中的应用

Binder 驱动中的引用计数大量使用原子操作:

// drivers/android/binder.c
static void binder_inc_node(struct binder_node *node, int strong, int weak,
struct list_head *target_list)
{
if (strong) {
// 原子地增加强引用计数
if (target_list)
atomic_inc(&node->tmp_refs);
else
atomic_inc(&node->internal_strong_refs);
}
if (weak)
atomic_inc(&node->local_weak_refs);
}

static bool binder_dec_node(struct binder_node *node, int strong, int weak)
{
if (strong) {
bool free_node;
// 原子地减少强引用计数,并检查是否为 0
if (atomic_dec_and_test(&node->internal_strong_refs)) {
free_node = true;
}
}
if (weak && atomic_dec_and_test(&node->local_weak_refs)) {
// 弱引用也降为 0,可以释放 binder_node
}
}

使用原子操作避免了对多进程共享的 binder_node 引用计数的锁争用。

二、自旋锁(Spinlock)

2.1 基本原理

Spinlock 是最简单的锁实现。当锁被持有时,其他 CPU 核心在其上”自旋”(不断轮询)直到锁被释放。适用于临界区极短的场景——通常在几微秒级别。

// include/linux/spinlock.h
// raw_spin_lock 和 raw_spin_unlock

static inline void raw_spin_lock(raw_spinlock_t *lock)
{
preempt_disable(); // 关闭内核抢占
do {
// 原子地尝试获取锁
} while (!arch_spin_trylock(&lock->raw_lock));
}

static inline void raw_spin_unlock(raw_spinlock_t *lock)
{
arch_spin_unlock(&lock->raw_lock);
preempt_enable(); // 重新打开内核抢占
}

为什么 spinlock 必须关闭抢占?

如果在持有 spinlock 时允许内核抢占,以下场景可能导致死锁:

  1. 线程 A 在 CPU 0 上获取 spinlock
  2. 线程 B 被调度到 CPU 0 上,它也想获取同一个 spinlock
  3. B 不断自旋,但 A 由于被抢占而无法释放锁
  4. 死锁——B 自旋永远等待 A,而 A 得不到执行

通过 preempt_disable() 禁止抢占,确保持有锁的线程不会被调出 CPU。

ARM 上的 spinlock 实现:ARM 使用 WFE(Wait For Event)指令配合 SEV(Send Event)指令。当自旋等待时,CPU 进入 WFE 低功耗等待状态,释放锁时通过 SEV 唤醒等待的 CPU。

2.2 Binder 驱动中的 Spinlock 分层

Binder 驱动使用复杂的锁层级来避免死锁。最外层是 binder_procs_lock(保护全局 binder_procs 列表),内层是进程级别的锁,最内层是 binder_node 级别的锁。

// drivers/android/binder.c
// 锁层级定义
enum {
BINDER_LOOPER_STATE_NEED_RETURN = 0x01,
};

// 全局进程表锁
static DEFINE_MUTEX(binder_procs_lock);

// 每个 binder_proc 有自己的锁
// proc->outer_lock 保护 proc 级别
// proc->inner_lock 保护线程/节点级别

// binder_inner_proc_lock(proc) - 获取进程内层锁
// 调用时需要确保已持有 outer_lock 或明确表明只取 inner_lock
static void binder_inner_proc_lock(struct binder_proc *proc)
{
mutex_lock(&proc->inner_lock);
}

2.3 Spinlock 在用户空间的使用

Android 的 pthread 实现(bionic libc)中,pthread_mutex_t 的快速路径使用原子操作,慢速路径才进入 futex。pthread_spinlock_t 则是纯粹的用户空间 spinlock。

三、互斥锁(Mutex)

3.1 基本原理

Mutex 允许锁持有者睡眠(与 spinlock 不同),适用于临界区较长的场景。在 Linux 内核中通过 mutex 结构实现:

// include/linux/mutex.h
struct mutex {
atomic_long_t owner; // 锁持有者的 task_struct 指针
raw_spinlock_t wait_lock;
struct list_head wait_list; // 等待队列
};

// 加锁(可能会睡眠)
void mutex_lock(struct mutex *lock);

// 尝试加锁,不睡眠
int mutex_trylock(struct mutex *lock);

// 解锁
void mutex_unlock(struct mutex *lock);

mutex_lock 内部流程(简化)

  1. 尝试 fastpath:使用 atomic_long_try_cmpxchg 将 owner 从 0 设为 current task
  2. 如果成功,直接返回(无竞争)
  3. 如果失败,进入 slowpath:将 current task 放入 wait_list,设置 TASK_UNINTERRUPTIBLE,调用 schedule() 放弃 CPU
  4. 被唤醒后,再次尝试获取锁

3.2 Binder 驱动中的 Mutex

// drivers/android/binder.c

// 获取节点的锁(mutex)
static int binder_node_lock(struct binder_node *node, bool internal)
{
if (internal)
mutex_lock(&node->internal_lock);
else
mutex_lock(&node->lock);
return 0;
}

// 在 binder_transaction 中
static void binder_transaction(...)
{
// 获取目标进程锁
binder_proc_lock(proc);
binder_inner_proc_lock(target_proc);

// ... 操作 binder_node 和 binder_ref ...

binder_inner_proc_unlock(target_proc);
binder_proc_unlock(proc);
}

四、信号量(Semaphore)

信号量是带有计数器的同步原语,允许多个线程同时访问有限资源。

// include/linux/semaphore.h
struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};

// P 操作(等待/减计数)
void down(struct semaphore *sem); // 可能睡眠
int down_trylock(struct semaphore *sem); // 不睡眠
int down_interruptible(struct semaphore *sem); // 可被信号中断

// V 操作(释放/增计数)
void up(struct semaphore *sem);

信号量 vs mutex

  • Mutex 只能有 1 个持有者(二进制),semaphore 可以有 N 个(count 值)
  • Mutex 有明确的 owner 概念(用于优先级继承和调试),semaphore 无
  • Mutex 只能由持有者释放,semaphore 可以由任何线程 up

五、条件变量(Condition Variable)

条件变量配合 mutex 实现等待某个条件成立的同步。

// 典型模式
pthread_mutex_lock(&mutex);
while (!condition) {
pthread_cond_wait(&cond, &mutex); // 释放 mutex 并等待,收到信号后重新获取 mutex
}
// 条件满足,执行操作
pthread_mutex_unlock(&mutex);

为什么必须用 while 而不是 if 检查条件?:因为存在虚假唤醒(spurious wakeup)——操作系统可能在条件不满足时唤醒等待的线程(通常是由于中断或其他系统事件)。使用 while 循环可确保唤醒后重新检查条件。

pthread_cond_signal vs pthread_cond_broadcast

  • signal:唤醒至少一个等待线程(通常是最先等待的那个)
  • broadcast:唤醒所有等待线程
  • 优先使用 broadcast 的场景:当条件变化可能同时满足多个等待线程时

六、屏障(Barrier)

屏障等待所有参与者到达某个同步点后才继续执行:

pthread_barrier_t barrier;
pthread_barrier_init(&barrier, NULL, N); // N 个参与者

// 每个线程在同步点调用
pthread_barrier_wait(&barrier); // 阻塞直到 N 个线程都调用了 wait

在 ART 中,当 GC 需要停止所有 mutator 线程时,使用屏障来同步各线程的 safepoint 到达。

七、RCU(Read-Copy-Update)

7.1 基本原理

RCU 是一种为”读写比例极大”的场景优化的同步机制。基本思想是:读者无锁访问数据(无需获取任何锁),写者先复制数据、修改副本、然后原子地更新指针。

// include/linux/rcupdate.h

// 读者侧
rcu_read_lock();
// 访问受 RCU 保护的数据(无锁,只需禁用抢占)
struct my_data *p = rcu_dereference(global_ptr);
// ... 使用 p ...
rcu_read_unlock();

// 写者侧
struct my_data *new_data = kmalloc(...);
*new_data = *old_data;
new_data->field = new_value;
// 原子地替换指针
rcu_assign_pointer(global_ptr, new_data);
// 等待所有读者完成
synchronize_rcu();
// 释放旧数据
kfree(old_data);

Grace Periodsynchronize_rcu() 等待所有 CPU 至少经历一次上下文切换。因为 rcu_read_lock() 禁用了抢占,上下文切换确保读者已经离开临界区——在上下文切换前被抢占的读者一定已经完成了 rcu_read_unlock()

7.2 Binder 驱动中的 RCU

Binder 驱动在全局 binder_procs 哈希表的查找中使用 RCU。查找 binder_proc 是高频只读操作,而添加/删除 binder_proc 相对很少:

// drivers/android/binder.c
// 查找进程——无需锁!
struct binder_proc *binder_get_proc_rcu(struct binder_proc *proc)
{
// 使用 RCU 保护的哈希表查找
struct binder_proc *p;
hash_for_each_possible_rcu(binder_procs, p, proc_node, pid);
return p;
}

八、Futex(Fast Userspace Mutex)

8.1 基本原理

Futex 是 Linux 提供的最重要的用户空间同步系统调用。它在无竞争时完全在用户空间运行(原子操作),只在有竞争时才陷入内核:

// include/uapi/linux/futex.h
// futex 系统调用原型
long sys_futex(void __user *uaddr, int op, u32 val,
struct __kernel_timespec __user *utime,
void __user *uaddr2, u32 val3);

// 常用操作
#define FUTEX_WAIT 0 // 在 uaddr 处等待(如果 *uaddr == val,则睡眠)
#define FUTEX_WAKE 1 // 唤醒等待在 uaddr 上的最多 val 个线程
#define FUTEX_REQUEUE 3 // 将等待者从 uaddr 移到 uaddr2
#define FUTEX_WAIT_PRIVATE 128 // 进程内 futex(无需全局 hash,性能更好)

8.2 Futex 的等待唤醒协议

FUTEX_WAIT 的原子性:内核先检查 *uaddr == val,只有相等时才睡眠。这解决了 TOCTOU(Time-of-check-time-of-use)竞态——如果解锁发生在检查之后、睡眠之前,futex 保证不会错过唤醒。

// 用户空间的 futex_lock 伪代码
void futex_lock(int *lock) {
int v;
// 尝试 fastpath:CAS 加锁
if (atomic_cmpxchg(lock, 0, 1) == 0) return;

// slowpath:需要等待
while (true) {
// 设置锁为"有等待者"状态(2),确保解锁者会做 FUTEX_WAKE
v = atomic_exchange(lock, 2);
if (v == 0) return; // 恰好此时锁被释放

// 等待 lock == 2
syscall(SYS_futex, lock, FUTEX_WAIT_PRIVATE, 2, NULL, NULL, 0);
}
}

void futex_unlock(int *lock) {
// 将 lock 设为 0,并获取旧值
int v = atomic_exchange(lock, 0);
if (v == 2) {
// 有等待者,唤醒它们
syscall(SYS_futex, lock, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
}
}

三个状态值的含义

  • 0 — unlocked(无等待者)
  • 1 — locked(无等待者)
  • 2 — locked(有等待者)

8.3 ART 中 Object.wait/notify 的 Futex 实现

ART 虚拟机使用 futex 来实现 Java 的 Object.wait() / Object.notify()

// art/runtime/base/mutex.cc
// Mutex 的底层实现使用了 futex

void Mutex::Lock(Thread* self) {
if (state_and_contenders_.fetch_or(1, std::memory_order_acquire) == 0) {
return; // 无竞争:快速路径,纯原子操作
}
// 有竞争:进入慢速路径,使用 futex
LockSlowPath(self);
}

void Mutex::LockSlowPath(Thread* self) {
do {
// 尝试 CAS 获取锁
if (state_and_contenders_.compare_exchange_weak(...)) {
return;
}
// 失败则调用 futex(FUTEX_WAIT) 进入睡眠
futex(state_and_contenders_.Address(), FUTEX_WAIT_PRIVATE, ...);
} while (true);
}

void Mutex::Unlock(Thread* self) {
if (state_and_contenders_.fetch_and(~1, std::memory_order_release) == 0) {
return; // 无等待者
}
// 有等待者:唤醒
futex(state_and_contenders_.Address(), FUTEX_WAKE_PRIVATE, 1);
}

ART 的 Monitor(对应 Java synchronized 关键字的实现)同样依赖 futex:

// art/runtime/monitor.cc
void Monitor::Wait(Thread* self, int64_t ms, int32_t ns, ...) {
// 通过 Mutex 实现,底层使用 futex(FUTEX_WAIT)
}

void Monitor::Notify(Thread* self) {
// 从 wait_set_ 中取出一个线程,通过 Mutex 的 unlock 唤醒
// 底层使用 futex(FUTEX_WAKE)
}

8.4 Thin Lock 到 Fat Lock 的升级

ART 的 Monitor 实现了锁膨胀(Lock Inflation):

  1. Thin Lock:存储在对象头(lock word)中的 32-bit 值,无竞争时直接原子 CAS 操作
  2. Fat Lock:当竞争发生或需要 wait/notify 时,升级为完整的 Monitor 对象(包含 Mutex 和条件变量)
  3. 升级过程通过 Monitor::InflateThinLocked() 完成,需要分配 Monitor 对象并 atomically 替换对象的 lock word

九、Binder 驱动的锁层级设计

Binder 驱动中有一个严格的锁获取顺序,防止死锁:

binder_procs_lock      (全局互斥锁)
└→ proc->outer_lock (进程级互斥锁)
└→ proc->inner_lock (进程内部互斥锁)
└→ node->lock (binder_node 互斥锁)
└→ node->internal_lock

所有代码路径必须按这个顺序获取锁。Binder 驱动在开发初期曾面临严重的死锁问题,后续通过严格的 lockdep 注解和代码审查解决了这些问题。

// drivers/android/binder.c
// lockdep 注解示例
static void binder_inner_proc_lock(struct binder_proc *proc)
{
mutex_lock_nested(&proc->inner_lock, BINDER_LOOPER_STATE_NEED_RETURN);
}

mutex_lock_nested 的第二个参数是锁层级标记,用于 lockdep 验证——lockdep 会跟踪所有 mutex 的获取顺序,当检测到”先获取 A 再获取 B”和”先获取 B 再获取 A”的不一致顺序时报警。

十、Android 特有的同步方案

10.1 Binder oneway

Binder 的 oneway 事务是异步的——调用方发送事务后立即返回,不等待接收方处理。这是一种”发送即忘”的同步模式:

// AIDL
oneway void notifyEvent(Event event);

oneway 的底层机制:Binder 驱动在接收方线程池有空闲线程时投递事务,如果线程池全忙,事务进入队列。这比传统的同步 Binder 调用更高效(无等待),但牺牲了返回值。

10.2 Handler 同步机制

Android 的 Handler 基于消息队列(MessageQueue)和 Looper,是一种单线程事件循环的同步模式。Handler.post()sendMessage() 是异步非阻塞的;如果需要在子线程等待主线程执行结果,可以使用 CountDownLatchCompletableFuture 配合 Handler。

十一、核心面试题

Q1:spinlock 和 mutex 的核心区别是什么?什么场景用哪个?

核心区别:spinlock 等待时 CPU 忙等(不释放 CPU),mutex 等待时线程进入睡眠(释放 CPU)。因此:spinlock 适用于临界区极小(微秒级)且不能睡眠的场景(如中断处理程序、持有锁时间 < 上下文切换时间),mutex 适用于临界区可能较长、允许睡眠的场景。关键判断:如果临界区中有可能睡眠的操作(如内存分配 GFP_KERNEL、copy_from_user),必须用 mutex 而不能用 spinlock(否则可能死锁)。

Q2:ART 为什么选择基于 futex 实现锁而不是仅依赖 pthread_mutex_t?

ART 需要更精细地控制锁行为:(1) 性能——ART 通过在用户空间用原子变量 CAS 可以避免大量情况下的系统调用,pthread_mutex 的函数调用开销在某些 ARM 芯片上较大。(2) 线程状态管理——ART 需要知道线程是否在等待锁(用于线程 suspend、GC safepoint 等),这需要与运行时深度集成。(3) Monitor 膨胀(thin lock → fat lock)需要精确控制底层 futex 操作,普通 pthread_mutex 无法支持这种动态升级机制。

Q3:RCU 中的 synchronize_rcu 是如何知道所有读者都完成的?为什么说 RCU 读者是无锁的?

RCU 利用 Linux 内核的抢占调度:rcu_read_lock() 实际只做 preempt_disable(),禁止内核抢占;rcu_read_unlock()preempt_enable()synchronize_rcu() 等待所有 CPU 至少经历一次上下文切换(即一个 grace period),这保证了所有在 rcu_read_lock/unlock 区间内的读者都已离开临界区。读者无锁是因为它仅禁用了抢占,没有获取任何锁——这意味着读者零争用开销,非常适合 Binder 驱动中 binder_procs 的查找这种高频只读场景。

Q4:为什么条件变量要配合 while 循环使用?虚假唤醒是什么?

条件变量存在虚假唤醒(spurious wakeup)——操作系统可能在条件不满足时唤醒等待的线程(原因包括 POSIX 允许实现在多处理器上简化实现、信号中断等)。使用 while (!condition) 而不是 if (!condition) 可确保线程被唤醒后重新检查条件,防止在条件未满足的情况下执行临界区代码。这是 POSIX 标准明确允许的行为,所有条件变量使用都必须遵循 while 模式。

Q5:Android 的 Binder 驱动为什么需要如此复杂的锁层级?

Binder 驱动管理着多个进程之间的对象引用关系——binder_node(服务端对象)、binder_ref(客户端引用)、binder_proc(进程上下文)、binder_thread(线程上下文)。这些数据结构之间的引用关系复杂且跨进程。如果使用单一锁保护所有结构,性能将非常差(所有 binder 事务串行化)。但使用多个锁意味着必须定义严格的获取顺序防止死锁——例如,先获取全局锁、再获取进程锁、再获取节点锁。lockdep(Linux 内核的内存锁依赖检测)在这方面起到了关键作用。

AOSP 核心路径参考:

  • drivers/android/binder.c — Binder 驱动锁层级
  • art/runtime/base/mutex.cc — ART Mutex(基于 futex)
  • art/runtime/monitor.cc — ART Monitor(synchronized 实现)
  • include/linux/spinlock.h — Linux spinlock
  • include/linux/mutex.h — Linux mutex
  • include/linux/rcupdate.h — Linux RCU
  • include/uapi/linux/futex.h — Futex 系统调用接口
  • include/linux/semaphore.h — Linux semaphore
打赏
  • 微信
  • 支付宝

评论