深入剖析 Java 并发基石:AbstractQueuedSynchronizer (AQS)
AbstractQueuedSynchronizer(简称 AQS)是 Java 并发包 (java.util.concurrent
, JUC) 中一个至关重要的组件。它是构建锁(如 ReentrantLock
)、信号量(Semaphore
)、倒计时器(CountDownLatch
)等多种同步工具的底层框架。理解 AQS 的工作原理,对于深入掌握 JUC 的精髓、提升并发编程能力,乃至设计自定义同步器都具有不可替代的价值。本文将从 AQS 的设计思想、核心组件、独占与共享模式下的工作流程、Condition
条件队列,以及如何基于 AQS 构建自定义同步器等多个维度进行深入剖析,力求帮助读者彻底理解这一并发编程的“瑞士军刀”。
一、 引言:AQS 的江湖地位与核心价值
1.1 JUC 的“幕后英雄”
如果你使用过 ReentrantLock
、Semaphore
、CountDownLatch
、ReentrantReadWriteLock
或 FutureTask
,那么你已经间接接触到了 AQS。AQS (AbstractQueuedSynchronizer) 正是这些我们耳熟能详的并发工具背后的核心同步控制框架。可以说,AQS 是 Doug Lea 大神为 Java 并发世界贡献的一块坚实基石。
1.2 面临的并发问题与 AQS 的设计初衷
在 AQS 出现之前,如果要实现一个自定义的同步器,开发者需要自行处理线程的排队、阻塞、唤醒、状态管理等一系列复杂且易错的问题。这不仅开发成本高,而且难以保证正确性和高效性。 AQS 的设计初衷便是将这些通用、繁琐的底层同步机制抽象出来,形成一个可复用的框架。开发者只需要关注特定同步场景下的状态(state
)的获取与释放逻辑,而将线程管理、队列操作等公共部分交给 AQS 处理,从而极大地简化了同步器的构建过程。
二、 AQS 核心设计理念与架构概览
AQS 的设计精髓在于它优雅地分离了“同步状态的管理”和“线程阻塞/唤醒机制”。
2.1 模板方法模式的极致运用
AQS 本身是一个抽象类,它采用了经典的模板方法模式。
AQS 定义了同步器实现的核心骨架:它封装了线程的排队、等待、唤醒等通用逻辑。 *
* 子类只需关注特定状态的获取与释放逻辑**:子类通过重写 AQS 提供的一些受保护(
protected
)方法(如tryAcquire
,tryRelease
等)来定义具体的同步语义。
这意味着,当我们需要实现一个新的同步器时,大部分工作 AQS 已经帮我们完成了,我们只需要“填空”即可。
2.2 state
状态变量
AQS 内部维护了一个核心的同步状态变量: private volatile int state;
这个 state
是一个 int
类型的变量,使用 volatile
修饰以保证其在多线程间的可见性。state
的具体含义由子类自行定义。例如:
- 在
ReentrantLock
中,state
表示锁的重入次数。 - 在
Semaphore
中,state
表示当前可用的许可数量。 - 在
CountDownLatch
中,state
表示还需要倒计数的数量。
AQS 提供了三个受保护的方法来原子地操作 state
:
protected final int getState()
protected final void setState(int newState)
protected final boolean compareAndSetState(int expect, int update)` (CAS 操作)
子类必须通过这三个方法来安全地读取和修改 state
。
2.3 FIFO 同步队列 (CLH 队列变体)
当多个线程竞争同一个同步状态(资源)失败时,它们会被放入一个先进先出(FIFO)的等待队列中。AQS 内部实现的这个队列通常被认为是 CLH (Craig, Landin, and Hagersten) 锁队列的一种变体。
队列结构:这是一个双向链表,每个节点(Node
)代表一个等待获取同步状态的线程。
Node
内部类:Node
是 AQS 的一个静态内部类,其关键属性包括:
volatile int waitStatus
: 节点状态 (如CANCELLED
,SIGNAL
,CONDITION
,PROPAGATE
)。volatile Node prev
: 前驱节点。volatile Node next
: 后继节点。volatile Thread thread
: 当前节点封装的线程。Node nextWaiter
: 用于条件队列,或标记节点是共享模式还是独占模式。
head
和 tail
指针:
head
: 指向队列的头节点。head
节点通常是一个虚拟节点(或称为哨兵节点),它并不代表某个具体的等待线程,而是代表当前持有锁(或已获取资源)的线程(逻辑上)。tail
: 指向队列的尾节点。
CLH 队列的优点是无锁(入队和出队操作主要依赖 CAS),并且能够有效地避免“惊群效应”,只唤醒必要的线程。
2.4 独占模式 (Exclusive) 与共享模式 (Shared)
AQS 支持两种资源访问模式:
独占模式 (Exclusive Mode):资源在同一时刻只能被一个线程持有。例如
ReentrantLock
。AQS 提供了
acquire(int arg)
和release(int arg)
方法用于独占式获取和释放资源。子类需要重写
tryAcquire(int arg)
和tryRelease(int arg)
。共享模式 (Shared Mode):资源在同一时刻可以被多个线程持有。例如
Semaphore
、CountDownLatch
、ReentrantReadWriteLock
的读锁。AQS 提供了
acquireShared(int arg)
和releaseShared(int arg)
方法用于共享式获取和释放资源。子类需要重写
tryAcquireShared(int arg)
和tryReleaseShared(int arg)
。
一个同步器通常只支持其中一种模式,但像 ReentrantReadWriteLock
这样的复杂同步器会同时用到两种模式(写锁独占,读锁共享)。Node
中的 nextWaiter
字段也用于区分节点是独占模式(Node.EXCLUSIVE
)还是共享模式(Node.SHARED
)。
三、 AQS 内部机制深度剖析:独占模式
以独占模式为例,我们来分析线程获取和释放同步状态的详细流程。
3.1 核心入口:acquire(int arg)
当一个线程调用基于 AQS 实现的同步器的 lock()
方法时(通常内部会调用 AQS 的 acquire(int arg)
),流程如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 1. 尝试获取资源
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 2. 获取失败,则入队并等待
selfInterrupt(); // 3. 如果等待过程中被中断,则自我中断
}
tryAcquire(int arg)
:这是子类必须实现的关键方法。它尝试以独占方式获取资源(即修改
state
)。如果获取成功,返回
true
,acquire
方法结束。如果获取失败,返回
false
,流程继续。非公平锁:
tryAcquire
通常会直接尝试获取,不管队列中是否有等待者。公平锁:
tryAcquire
会先检查队列中是否有前驱节点(hasQueuedPredecessors()
),如果有,则直接失败,体现公平性。addWaiter(Node mode)
:如果
tryAcquire
失败,说明当前线程无法立即获取资源。AQS 会将当前线程封装成一个
Node
对象,模式为Node.EXCLUSIVE
。然后调用
enq(Node node)
方法将该节点加入到同步队列的尾部。acquireQueued(Node node, int arg)
:这是核心的等待逻辑。节点入队后,该方法会让线程在队列中自旋(有限次)或阻塞,直到它成为头节点的直接后继,并成功获取到资源。
如果线程在等待过程中被中断,
acquireQueued
会返回true
,最终导致selfInterrupt()
被调用,以保留中断状态。
3.2 线程入队:addWaiter(Node mode)
和 enq(Node node)
addWaiter
负责创建节点,enq
负责将节点安全地加入队列尾部。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 创建节点
Node pred = tail; // 获取当前尾节点
if (pred != null) { // 如果队列已存在
node.prev = pred;
if (compareAndSetTail(pred, node)) { // CAS设置新尾节点
pred.next = node;
return node;
}
}
enq(node); // 如果队列未初始化或CAS失败,则通过enq自旋入队
return node;
}
private Node enq(final Node node) {
for (;;) { // 无限循环,直到成功入队
Node t = tail;
if (t == null) { // 队列未初始化
if (compareAndSetHead(new Node())) // 初始化头节点 (虚拟节点)
tail = head;
} else { // 队列已初始化,将节点链到尾部
node.prev = t;
if (compareAndSetTail(t, node)) { // CAS设置新尾节点
t.next = node;
return t; // 返回旧的尾节点 (即新节点的前驱)
}
}
}
}
enq
方法通过 CAS 操作保证了在并发环境下节点入队的原子性和线程安全。它会不断尝试,直到成功将节点设置为新的 tail
。如果队列尚未初始化(tail == null
),它还会负责初始化 head
节点。
3.3 阻塞与唤醒:acquireQueued(Node node, int arg)
节点入队后,acquireQueued
方法是实际的等待和获取逻辑:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 标记是否成功获取资源
try {
boolean interrupted = false; // 标记等待过程中是否被中断
for (;;) { // 自旋
final Node p = node.predecessor(); // 获取前驱节点
if (p == head && tryAcquire(arg)) { // 如果前驱是头节点,并且成功获取资源
setHead(node); // 将当前节点设为新的头节点
p.next = null; // help GC,断开旧头节点的next指针
failed = false;
return interrupted; // 返回中断状态
}
// 如果获取资源失败,判断是否应该挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 挂起线程,并检查中断状态
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); // 如果获取失败(如异常),则取消节点
}
}
核心逻辑:
自旋检查:在一个无限循环中,当前线程(封装在
node
中)首先检查其前驱节点p
。获取机会:如果
p
是head
节点(意味着当前节点是队列中的第一个实际等待者),则再次调用tryAcquire(arg)
尝试获取资源。如果成功,将当前节点设置为新的
head
(setHead(node)
),旧的head
节点出队,方法返回。判断是否挂起:如果当前节点不是
head
的直接后继,或者tryAcquire
失败,则调用shouldParkAfterFailedAcquire(p, node)
。shouldParkAfterFailedAcquire
的主要作用是检查前驱节点p
的waitStatus
。如果p.waitStatus
是Node.SIGNAL
,表示前驱节点在释放资源时会唤醒当前节点,那么当前节点可以安全地挂起。如果
p.waitStatus
不是SIGNAL
(例如是0或CANCELLED
),它会尝试将p.waitStatus
设置为Node.SIGNAL
。如果前驱节点是CANCELLED
,则会向前遍历,剔除已取消的节点。挂起线程:如果
shouldParkAfterFailedAcquire
返回true
,则调用parkAndCheckInterrupt()
。LockSupport.park(this)
:实际挂起当前线程,使其进入等待状态。线程被唤醒(通常是被其前驱节点
unparkSuccessor
唤醒)后,parkAndCheckInterrupt
会检查并返回线程在挂起期间是否被中断。
3.4 核心出口:release(int arg)
当持有资源的线程调用 unlock()
方法时(通常内部会调用 AQS 的 release(int arg)
),流程如下:
public final boolean release(int arg) {
if (tryRelease(arg)) { // 1. 尝试释放资源
Node h = head;
if (h != null && h.waitStatus != 0) // 2. 如果头节点存在且状态不是初始值
unparkSuccessor(h); // 3. 唤醒后继节点
return true;
}
return false;
}
tryRelease(int arg)
:子类必须实现的关键方法。它尝试释放资源(即修改
state
)。如果释放成功(通常意味着
state
变为某个特定值,如0),返回true
。如果释放失败(例如,对于可重入锁,
state
减1后仍大于0),返回false
。unparkSuccessor(Node node)
:如果
tryRelease
返回true
且队列的head
节点存在并且其waitStatus
不为0(通常意味着有后继节点需要被唤醒,例如waitStatus
为SIGNAL
),则调用此方法。unparkSuccessor
的目的是找到head
节点的下一个最合适的、未被取消的后继节点,并使用LockSupport.unpark(thread)
唤醒其对应的线程。它通常会从
tail
向前查找,以处理队列中可能存在的CANCELLED
节点。
四、 AQS 内部机制深度剖析:共享模式
共享模式与独占模式的核心流程类似,但在资源获取成功后的传播行为上有所不同。
4.1 核心入口:acquireShared(int arg)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 1. 尝试获取共享资源
doAcquireShared(arg); // 2. 获取失败,则入队等待
}
tryAcquireShared(int arg)
:子类必须实现。尝试获取共享资源。
返回值:
负数:获取失败。
0:获取成功,但后续其他共享线程不能再成功获取(例如,资源刚好被用完)。
正数:获取成功,并且后续其他共享线程可能也能成功获取(资源有剩余)。
doAcquireShared(int arg)
:如果
tryAcquireShared
返回负数,则调用此方法,逻辑与独占模式的acquireQueued
类似,但有关键区别。
4.2 共享式获取:doAcquireShared(int arg)
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 以共享模式入队
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); // 再次尝试获取
if (r >= 0) { // 获取成功
setHeadAndPropagate(node, r); // 设置头节点并传播唤醒
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
关键在于 setHeadAndPropagate(Node node, int propagate)
:
- 当一个线程成功获取共享资源后(
r >= 0
),它会将自己设为head
。 - 如果
propagate
(即tryAcquireShared
的返回值)大于0,或者旧的head.waitStatus
允许传播(例如是PROPAGATE
状态,或在某些情况下为0),则它会继续唤醒队列中的下一个共享模式的等待节点。这就是所谓的“级联唤醒”或“传播”。 - 这确保了在资源充足的情况下,多个等待共享资源的线程可以被连续唤醒。
4.3 核心出口:releaseShared(int arg)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 1. 尝试释放共享资源
doReleaseShared(); // 2. 唤醒后继节点
return true;
}
return false;
}
tryReleaseShared(int arg)
:子类必须实现。尝试释放共享资源。
如果成功释放并且可能允许其他等待线程获取资源,则返回
true
。doReleaseShared()
:这个方法会无条件地从
head
开始,尝试唤醒后继的共享节点。它会一直传播唤醒信号,直到队列中没有等待的共享节点或者遇到一个非共享模式的节点。
确保在共享资源被释放时,所有符合条件的等待线程都能被及时唤醒。
head
节点的waitStatus
在这里起着关键作用,用于控制传播。
五、 AQS 的“灵魂伴侣”:条件队列 ConditionObject
ConditionObject
是 AQS 的一个内部类,它实现了 接口。Condition
为线程提供了一种比 Object.wait()
/ notify()
/ notifyAll()
更强大、更灵活的等待/通知机制。
5.1 为何需要 Condition
?
Object
的监视器方法(wait
/notify
)有一些局限性:
- 它们与
synchronized
关键字紧密绑定。 - 一个锁对象只有一个等待集,无法区分多个不同的等待条件。
Condition
允许一个 Lock
关联多个条件队列,从而可以为不同的条件创建不同的等待集,实现更精细化的线程协作。
5.2 ConditionObject
内部结构
每个 ConditionObject
实例内部维护一个独立的条件队列(FIFO,单向链表):
firstWaiter
: 指向条件队列的头节点。lastWaiter
: 指向条件队列的尾节点。
这个条件队列中的节点类型也是 AQS 的 Node
,但其 waitStatus
通常是 Node.CONDITION
。 重要:条件队列独立于 AQS 的主同步队列。
5.3 await()
方法详解
当线程在一个 Condition
上调用 await()
时:
- 创建新节点:当前线程被封装成一个
Node
,waitStatus
为Node.CONDITION
,并加入到该ConditionObject
的条件队列尾部。 - 完全释放锁:调用 AQS 的
fullyRelease(Node node)
方法,该方法会释放当前线程持有的所有(包括重入的)AQS 同步状态(锁)。这是至关重要的,因为只有释放了锁,其他线程才有机会获取锁并改变条件。 - 阻塞当前线程:线程被
LockSupport.park()
挂起,等待被signal
。 - 被唤醒后:
- 当其他线程在该
Condition
上调用signal()
或signalAll()
时,条件队列中的某个(或某些)节点会被转移到 AQS 的主同步队列中(transferForSignal()
方法)。 await()
方法被唤醒后,会尝试重新获取之前释放的锁(通过 AQS 的acquireQueued
或类似逻辑)。- 只有成功重新获取锁之后,
await()
方法才会返回。 - 如果在等待过程中或重新获取锁的过程中被中断,
await()
会抛出InterruptedException
(除非是不可中断的等待版本)。
5.4 signal()
/ signalAll()
方法详解
当线程在一个 Condition
上调用 signal()
或 signalAll()
时:
- 检查锁持有:首先会检查当前线程是否是持有与此
Condition
关联的锁。如果不是,会抛出IllegalMonitorStateException
。 - 转移节点:
signal()
: 从条件队列的头部取出一个节点(如果队列不为空),通过transferForSignal(Node node)
方法将其转移到 AQS 的主同步队列尾部。转移后,该节点的waitStatus
会被修改,表示它正在等待获取锁。signalAll()
: 将条件队列中的所有节点依次转移到 AQS 的主同步队列中。- 唤醒:节点被转移到主同步队列后,并不会立即被唤醒。唤醒操作通常是由持有锁的线程在释放锁时(调用
unlock()
->release()
->unparkSuccessor()
)触发,或者在共享模式下由setHeadAndPropagate
触发。被转移的节点会像其他在同步队列中等待的节点一样参与锁的竞争。
sequenceDiagram
participant T as Thread
participant AQS
participant CondQueue
T->>AQS: await()
AQS->>CondQueue: addConditionWaiter()
AQS->>AQS: fullyRelease()
loop 等待信号
AQS->>T: park()
end
T->>AQS: signal()
AQS->>CondQueue: transferAfterSignal()
AQS->>SyncQueue: enq(node)
AQS->>T: unpark()
六、 基于 AQS 构建自定义同步器实战
理解了 AQS 的原理后,我们可以尝试构建自己的同步器。关键在于重写 AQS 提供的钩子方法。
需要重写的方法(根据模式选择):
protected boolean tryAcquire(int arg)
(独占)protected boolean tryRelease(int arg)
(独占)protected int tryAcquireShared(int arg)
(共享)protected boolean tryReleaseShared(int arg)
(共享)protected boolean isHeldExclusively()
(判断当前线程是否独占资源,Condition
需要)
6.1 实现一个不可中断、非公平可重入的独占锁 (ReentrantMutex)
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class ReentrantMutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
private Thread owner; // 当前持有锁的线程
@Override
protected boolean isHeldExclusively() {
// 检查状态是否大于0,并且当前线程是否为持有者
return getState() > 0 && owner == Thread.currentThread();
}
@Override
protected boolean tryAcquire(int acquires) { // acquires 通常为1
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 锁未被持有
if (compareAndSetState(0, acquires)) {
owner = current;
return true;
}
} else if (owner == current) { // 锁已被当前线程持有 (重入)
//当时考虑这里要不要使用cas进行改变。但是当前线程是唯一能进入这段代码的线程;所以,此时是“线程安全的上下文”,没有竞争,不需要 CAS
int nextc = c + acquires;
if (nextc < 0) { // 溢出检查
throw new Error("Maximum lock count exceeded");
}
setState(nextc); // 直接设置,因为已持有锁,无需CAS
return true;
}
return false; // 其他线程尝试获取已被持有的锁
}
@Override
protected boolean tryRelease(int releases) { // releases 通常为1
int c = getState() - releases;
if (owner != Thread.currentThread()) { // 非持有者尝试释放
throw new IllegalMonitorStateException("Current thread does not hold the lock");
}
boolean free = false;
if (c == 0) { // 完全释放
free = true;
owner = null;
}
setState(c); // 直接设置
return free; // 返回是否已完全释放
}
// 提供 Condition 支持
final ConditionObject newCondition() {
return new ConditionObject();
}
// (可选) 提供获取持有者的方法
final Thread getOwner() {
return owner;
}
// (可选) 提供获取重入次数的方法
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1); // 不可中断获取
}
@Override
public void unlock() {
sync.release(1);
}
// --- Lock接口的其他方法实现 ---
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
// --- 自定义锁特有方法 ---
public boolean isLocked() {
return sync.getState() > 0;
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public int getHoldCount() {
return sync.getHoldCount();
}
}
state
的含义:state
在这里表示锁的重入次数。当state == 0
时,锁未被任何线程持有。owner
字段:新增一个Thread owner
字段来记录当前持有锁的线程,用于判断重入和防止非持有者释放锁。tryAcquire(int acquires)
:- 如果
state == 0
(锁空闲),通过compareAndSetState(0, acquires)
尝试获取。成功则设置owner
为当前线程。 - 如果
state > 0
且owner == Thread.currentThread()
(当前线程已持有锁),则增加state
(setState(c + acquires)
),实现重入。注意这里不需要 CAS,因为当前线程已独占访问权。 - 其他情况(锁被其他线程持有)则返回
false
。 tryRelease(int releases)
:- 首先检查当前线程是否为
owner
,如果不是则抛出IllegalMonitorStateException
。 - 将
state
减去releases
。 - 如果
state
减为0,则表示锁已完全释放,将owner
设为null
,并返回true
(这将触发 AQS 唤醒等待队列中的下一个线程)。否则返回false
(锁仍被当前线程持有)。 isHeldExclusively()
:判断state > 0
且owner == Thread.currentThread()
。这个方法对于ConditionObject
的正确工作至关重要。newCondition()
: 直接返回sync.newCondition()
,利用 AQS 内建的ConditionObject
。
这个 ReentrantMutex
比 SimpleMutex
更完善,因为它正确处理了重入逻辑,这是实际锁中非常重要的特性。它也暴露了 lockInterruptibly
, tryLock
等 Lock
接口的标准方法,这些方法直接委托给 AQS 的相应实现。
6.2 实现一个简单的计数信号量 (SimpleSemaphore)
一个简单的计数信号量可以使用共享模式:
state
表示当前可用的许可数量。tryAcquireShared(int permits):
循环 CAS 尝试减少
state
。如果
state
减去permits
后小于0,则获取失败 (返回负数)。否则成功,返回
state
减去permits
后的值 (>=0)。tryReleaseShared(int permits):
循环 CAS 尝试增加
state
。始终返回
true
(假设增加总能成功)。
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class SimpleSemaphore {
private static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits); // 初始化许可数量
}
// 尝试获取共享许可
@Override
protected int tryAcquireShared(int acquires) { // acquires 通常为需要获取的许可数
for (;;) { // 循环 CAS
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining; // 小于0表示失败,>=0表示成功
}
}
// 尝试释放共享许可
@Override
protected boolean tryReleaseShared(int releases) { // releases 通常为需要释放的许可数
for (;;) { // 循环 CAS
int current = getState();
int next = current + releases;
if (next < current) // 溢出检查
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true; // 释放成功
}
}
}
private final Sync sync;
public SimpleSemaphore(int permits) {
if (permits < 0) throw new IllegalArgumentException("permits must be non-negative.");
sync = new Sync(permits);
}
// 获取一个许可 (不可中断)
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1); // 使用 AQS 的共享模式获取
}
// 获取指定数量的许可 (不可中断)
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException("permits must be non-negative.");
sync.acquireSharedInterruptibly(permits);
}
// 释放一个许可
public void release() {
sync.releaseShared(1); // 使用 AQS 的共享模式释放
}
// 释放指定数量的许可
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException("permits must be non-negative.");
sync.releaseShared(permits);
}
// 获取当前可用许可数量
public int availablePermits() {
return sync.getState();
}
}
在这个 SimpleSemaphore
的实现中:
state
变量表示当前可用的许可数量。tryAcquireShared(int acquires)
方法尝试获取指定数量的许可,通过 CAS 减少state
。如果剩余许可数量小于0,表示获取失败;否则获取成功,返回剩余许可数量。tryReleaseShared(int releases)
方法尝试释放指定数量的许可,通过 CAS 增加state
。释放通常总是成功的,所以返回true
。acquire()
和release()
方法分别委托给 AQS 的acquireSharedInterruptibly()
和releaseShared()
方法,利用 AQS 的排队和唤醒机制来管理等待许可的线程。