深入剖析 Java 并发基石:AbstractQueuedSynchronizer (AQS)

AbstractQueuedSynchronizer(简称 AQS)是 Java 并发包 (java.util.concurrent, JUC) 中一个至关重要的组件。它是构建锁(如 ReentrantLock)、信号量(Semaphore)、倒计时器(CountDownLatch)等多种同步工具的底层框架。理解 AQS 的工作原理,对于深入掌握 JUC 的精髓、提升并发编程能力,乃至设计自定义同步器都具有不可替代的价值。本文将从 AQS 的设计思想、核心组件、独占与共享模式下的工作流程、Condition 条件队列,以及如何基于 AQS 构建自定义同步器等多个维度进行深入剖析,力求帮助读者彻底理解这一并发编程的“瑞士军刀”。

一、 引言:AQS 的江湖地位与核心价值

1.1 JUC 的“幕后英雄”

如果你使用过 ReentrantLockSemaphoreCountDownLatchReentrantReadWriteLockFutureTask,那么你已经间接接触到了 AQS。AQS (AbstractQueuedSynchronizer) 正是这些我们耳熟能详的并发工具背后的核心同步控制框架。可以说,AQS 是 Doug Lea 大神为 Java 并发世界贡献的一块坚实基石。

1.2 面临的并发问题与 AQS 的设计初衷

在 AQS 出现之前,如果要实现一个自定义的同步器,开发者需要自行处理线程的排队、阻塞、唤醒、状态管理等一系列复杂且易错的问题。这不仅开发成本高,而且难以保证正确性和高效性。 AQS 的设计初衷便是将这些通用、繁琐的底层同步机制抽象出来,形成一个可复用的框架。开发者只需要关注特定同步场景下的状态(state)的获取与释放逻辑,而将线程管理、队列操作等公共部分交给 AQS 处理,从而极大地简化了同步器的构建过程。

二、 AQS 核心设计理念与架构概览

AQS 的设计精髓在于它优雅地分离了“同步状态的管理”和“线程阻塞/唤醒机制”。

2.1 模板方法模式的极致运用

AQS 本身是一个抽象类,它采用了经典的模板方法模式

  • AQS 定义了同步器实现的核心骨架:它封装了线程的排队、等待、唤醒等通用逻辑。 *

  • * 子类只需关注特定状态的获取与释放逻辑**:子类通过重写 AQS 提供的一些受保护(protected)方法(如 tryAcquire, tryRelease 等)来定义具体的同步语义。

这意味着,当我们需要实现一个新的同步器时,大部分工作 AQS 已经帮我们完成了,我们只需要“填空”即可。

2.2 state 状态变量

AQS 内部维护了一个核心的同步状态变量: private volatile int state;

这个 state 是一个 int 类型的变量,使用 volatile 修饰以保证其在多线程间的可见性。state 的具体含义由子类自行定义。例如:

  • ReentrantLock 中,state 表示锁的重入次数。
  • Semaphore 中,state 表示当前可用的许可数量。
  • CountDownLatch 中,state 表示还需要倒计数的数量。

AQS 提供了三个受保护的方法来原子地操作 state

protected final int getState()

protected final void setState(int newState)

protected final boolean compareAndSetState(int expect, int update)` (CAS 操作)

子类必须通过这三个方法来安全地读取和修改 state

2.3 FIFO 同步队列 (CLH 队列变体)

当多个线程竞争同一个同步状态(资源)失败时,它们会被放入一个先进先出(FIFO)的等待队列中。AQS 内部实现的这个队列通常被认为是 CLH (Craig, Landin, and Hagersten) 锁队列的一种变体。

队列结构:这是一个双向链表,每个节点(Node)代表一个等待获取同步状态的线程。

Node 内部类Node 是 AQS 的一个静态内部类,其关键属性包括:

  • volatile int waitStatus: 节点状态 (如 CANCELLED, SIGNAL, CONDITION, PROPAGATE)。
  • volatile Node prev: 前驱节点。
  • volatile Node next: 后继节点。
  • volatile Thread thread: 当前节点封装的线程。
  • Node nextWaiter: 用于条件队列,或标记节点是共享模式还是独占模式。

headtail 指针

  • head: 指向队列的头节点。head 节点通常是一个虚拟节点(或称为哨兵节点),它并不代表某个具体的等待线程,而是代表当前持有锁(或已获取资源)的线程(逻辑上)。
  • tail: 指向队列的尾节点。

CLH 队列的优点是无锁(入队和出队操作主要依赖 CAS),并且能够有效地避免“惊群效应”,只唤醒必要的线程。

2.4 独占模式 (Exclusive) 与共享模式 (Shared)

AQS 支持两种资源访问模式:

  1. 独占模式 (Exclusive Mode):资源在同一时刻只能被一个线程持有。例如 ReentrantLock

  2. AQS 提供了 acquire(int arg)release(int arg) 方法用于独占式获取和释放资源。

  3. 子类需要重写 tryAcquire(int arg)tryRelease(int arg)

  4. 共享模式 (Shared Mode):资源在同一时刻可以被多个线程持有。例如 SemaphoreCountDownLatchReentrantReadWriteLock 的读锁。

  5. AQS 提供了 acquireShared(int arg)releaseShared(int arg) 方法用于共享式获取和释放资源。

  6. 子类需要重写 tryAcquireShared(int arg)tryReleaseShared(int arg)

一个同步器通常只支持其中一种模式,但像 ReentrantReadWriteLock 这样的复杂同步器会同时用到两种模式(写锁独占,读锁共享)。Node 中的 nextWaiter 字段也用于区分节点是独占模式(Node.EXCLUSIVE)还是共享模式(Node.SHARED)。

三、 AQS 内部机制深度剖析:独占模式

以独占模式为例,我们来分析线程获取和释放同步状态的详细流程。

3.1 核心入口:acquire(int arg)

当一个线程调用基于 AQS 实现的同步器的 lock() 方法时(通常内部会调用 AQS 的 acquire(int arg)),流程如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 1. 尝试获取资源
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 2. 获取失败,则入队并等待
        selfInterrupt(); // 3. 如果等待过程中被中断,则自我中断
}
  1. tryAcquire(int arg):

  2. 这是子类必须实现的关键方法。它尝试以独占方式获取资源(即修改 state)。

  3. 如果获取成功,返回 trueacquire 方法结束。

  4. 如果获取失败,返回 false,流程继续。

  5. 非公平锁tryAcquire 通常会直接尝试获取,不管队列中是否有等待者。

  6. 公平锁tryAcquire 会先检查队列中是否有前驱节点(hasQueuedPredecessors()),如果有,则直接失败,体现公平性。

  7. addWaiter(Node mode):

  8. 如果 tryAcquire 失败,说明当前线程无法立即获取资源。

  9. AQS 会将当前线程封装成一个 Node 对象,模式为 Node.EXCLUSIVE

  10. 然后调用 enq(Node node) 方法将该节点加入到同步队列的尾部。

  11. acquireQueued(Node node, int arg):

  12. 这是核心的等待逻辑。节点入队后,该方法会让线程在队列中自旋(有限次)或阻塞,直到它成为头节点的直接后继,并成功获取到资源。

  13. 如果线程在等待过程中被中断,acquireQueued 会返回 true,最终导致 selfInterrupt() 被调用,以保留中断状态。

3.2 线程入队:addWaiter(Node mode)enq(Node node)

addWaiter 负责创建节点,enq 负责将节点安全地加入队列尾部。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); // 创建节点
    Node pred = tail; // 获取当前尾节点
    if (pred != null) { // 如果队列已存在
        node.prev = pred;
        if (compareAndSetTail(pred, node)) { // CAS设置新尾节点
            pred.next = node;
            return node;
        }
    }
    enq(node); // 如果队列未初始化或CAS失败,则通过enq自旋入队
    return node;
}

private Node enq(final Node node) {
    for (;;) { // 无限循环,直到成功入队
        Node t = tail;
        if (t == null) { // 队列未初始化
            if (compareAndSetHead(new Node())) // 初始化头节点 (虚拟节点)
                tail = head;
        } else { // 队列已初始化,将节点链到尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) { // CAS设置新尾节点
                t.next = node;
                return t; // 返回旧的尾节点 (即新节点的前驱)
            }
        }
    }
}

enq 方法通过 CAS 操作保证了在并发环境下节点入队的原子性和线程安全。它会不断尝试,直到成功将节点设置为新的 tail。如果队列尚未初始化(tail == null),它还会负责初始化 head 节点。

3.3 阻塞与唤醒:acquireQueued(Node node, int arg)

节点入队后,acquireQueued 方法是实际的等待和获取逻辑:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; // 标记是否成功获取资源
    try {
        boolean interrupted = false; // 标记等待过程中是否被中断
        for (;;) { // 自旋
            final Node p = node.predecessor(); // 获取前驱节点
            if (p == head && tryAcquire(arg)) { // 如果前驱是头节点,并且成功获取资源
                setHead(node); // 将当前节点设为新的头节点
                p.next = null; // help GC,断开旧头节点的next指针
                failed = false;
                return interrupted; // 返回中断状态
            }
            // 如果获取资源失败,判断是否应该挂起当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 挂起线程,并检查中断状态
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node); // 如果获取失败(如异常),则取消节点
    }
}

核心逻辑:

  1. 自旋检查:在一个无限循环中,当前线程(封装在 node 中)首先检查其前驱节点 p

  2. 获取机会:如果 phead 节点(意味着当前节点是队列中的第一个实际等待者),则再次调用 tryAcquire(arg) 尝试获取资源。

  3. 如果成功,将当前节点设置为新的 headsetHead(node)),旧的 head 节点出队,方法返回。

  4. 判断是否挂起:如果当前节点不是 head 的直接后继,或者 tryAcquire 失败,则调用 shouldParkAfterFailedAcquire(p, node)

  5. shouldParkAfterFailedAcquire 的主要作用是检查前驱节点 pwaitStatus。如果 p.waitStatusNode.SIGNAL,表示前驱节点在释放资源时会唤醒当前节点,那么当前节点可以安全地挂起。

  6. 如果 p.waitStatus 不是 SIGNAL(例如是0或CANCELLED),它会尝试将 p.waitStatus 设置为 Node.SIGNAL。如果前驱节点是 CANCELLED,则会向前遍历,剔除已取消的节点。

  7. 挂起线程:如果 shouldParkAfterFailedAcquire 返回 true,则调用 parkAndCheckInterrupt()

  8. LockSupport.park(this):实际挂起当前线程,使其进入等待状态。

  9. 线程被唤醒(通常是被其前驱节点 unparkSuccessor 唤醒)后,parkAndCheckInterrupt 会检查并返回线程在挂起期间是否被中断。

3.4 核心出口:release(int arg)

当持有资源的线程调用 unlock() 方法时(通常内部会调用 AQS 的 release(int arg)),流程如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) { // 1. 尝试释放资源
        Node h = head;
        if (h != null && h.waitStatus != 0) // 2. 如果头节点存在且状态不是初始值
            unparkSuccessor(h); // 3. 唤醒后继节点
        return true;
    }
    return false;
}
  1. tryRelease(int arg):

  2. 子类必须实现的关键方法。它尝试释放资源(即修改 state)。

  3. 如果释放成功(通常意味着 state 变为某个特定值,如0),返回 true

  4. 如果释放失败(例如,对于可重入锁,state 减1后仍大于0),返回 false

  5. unparkSuccessor(Node node):

  6. 如果 tryRelease 返回 true 且队列的 head 节点存在并且其 waitStatus 不为0(通常意味着有后继节点需要被唤醒,例如 waitStatusSIGNAL),则调用此方法。

  7. unparkSuccessor 的目的是找到 head 节点的下一个最合适的、未被取消的后继节点,并使用 LockSupport.unpark(thread) 唤醒其对应的线程。

  8. 它通常会从 tail 向前查找,以处理队列中可能存在的 CANCELLED 节点。

四、 AQS 内部机制深度剖析:共享模式

共享模式与独占模式的核心流程类似,但在资源获取成功后的传播行为上有所不同。

4.1 核心入口:acquireShared(int arg)

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0) // 1. 尝试获取共享资源
        doAcquireShared(arg);       // 2. 获取失败,则入队等待
}
  1. tryAcquireShared(int arg):

  2. 子类必须实现。尝试获取共享资源。

  3. 返回值:

  4. 负数:获取失败。

  5. 0:获取成功,但后续其他共享线程不能再成功获取(例如,资源刚好被用完)。

  6. 正数:获取成功,并且后续其他共享线程可能也能成功获取(资源有剩余)。

  7. doAcquireShared(int arg):

  8. 如果 tryAcquireShared 返回负数,则调用此方法,逻辑与独占模式的 acquireQueued 类似,但有关键区别。

4.2 共享式获取:doAcquireShared(int arg)

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED); // 以共享模式入队
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg); // 再次尝试获取
                if (r >= 0) { // 获取成功
                    setHeadAndPropagate(node, r); // 设置头节点并传播唤醒
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

关键在于 setHeadAndPropagate(Node node, int propagate)

  • 当一个线程成功获取共享资源后(r >= 0),它会将自己设为 head
  • 如果 propagate(即 tryAcquireShared 的返回值)大于0,或者旧的 head.waitStatus 允许传播(例如是 PROPAGATE 状态,或在某些情况下为0),则它会继续唤醒队列中的下一个共享模式的等待节点。这就是所谓的“级联唤醒”或“传播”。
  • 这确保了在资源充足的情况下,多个等待共享资源的线程可以被连续唤醒。

4.3 核心出口:releaseShared(int arg)

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // 1. 尝试释放共享资源
        doReleaseShared();        // 2. 唤醒后继节点
        return true;
    }
    return false;
}
  1. tryReleaseShared(int arg):

  2. 子类必须实现。尝试释放共享资源。

  3. 如果成功释放并且可能允许其他等待线程获取资源,则返回 true

  4. doReleaseShared():

  5. 这个方法会无条件地从 head 开始,尝试唤醒后继的共享节点。

  6. 它会一直传播唤醒信号,直到队列中没有等待的共享节点或者遇到一个非共享模式的节点。

  7. 确保在共享资源被释放时,所有符合条件的等待线程都能被及时唤醒。head 节点的 waitStatus 在这里起着关键作用,用于控制传播。

五、 AQS 的“灵魂伴侣”:条件队列 ConditionObject

ConditionObject 是 AQS 的一个内部类,它实现了 接口。Condition 为线程提供了一种比 Object.wait() / notify() / notifyAll() 更强大、更灵活的等待/通知机制。

5.1 为何需要 Condition

Object 的监视器方法(wait/notify)有一些局限性:

  • 它们与 synchronized 关键字紧密绑定。
  • 一个锁对象只有一个等待集,无法区分多个不同的等待条件。

Condition 允许一个 Lock 关联多个条件队列,从而可以为不同的条件创建不同的等待集,实现更精细化的线程协作。

5.2 ConditionObject 内部结构

每个 ConditionObject 实例内部维护一个独立的条件队列(FIFO,单向链表):

  • firstWaiter: 指向条件队列的头节点。
  • lastWaiter: 指向条件队列的尾节点。

这个条件队列中的节点类型也是 AQS 的 Node,但其 waitStatus 通常是 Node.CONDITION重要:条件队列独立于 AQS 的主同步队列。

5.3 await() 方法详解

当线程在一个 Condition 上调用 await() 时:

  1. 创建新节点:当前线程被封装成一个 NodewaitStatusNode.CONDITION,并加入到该 ConditionObject 的条件队列尾部。
  2. 完全释放锁:调用 AQS 的 fullyRelease(Node node) 方法,该方法会释放当前线程持有的所有(包括重入的)AQS 同步状态(锁)。这是至关重要的,因为只有释放了锁,其他线程才有机会获取锁并改变条件。
  3. 阻塞当前线程:线程被 LockSupport.park() 挂起,等待被 signal
  4. 被唤醒后:
  5. 当其他线程在该 Condition 上调用 signal()signalAll() 时,条件队列中的某个(或某些)节点会被转移到 AQS 的主同步队列中(transferForSignal() 方法)。
  6. await() 方法被唤醒后,会尝试重新获取之前释放的锁(通过 AQS 的 acquireQueued 或类似逻辑)。
  7. 只有成功重新获取锁之后,await() 方法才会返回。
  8. 如果在等待过程中或重新获取锁的过程中被中断,await() 会抛出 InterruptedException(除非是不可中断的等待版本)。

5.4 signal() / signalAll() 方法详解

当线程在一个 Condition 上调用 signal()signalAll() 时:

  1. 检查锁持有:首先会检查当前线程是否是持有与此 Condition 关联的锁。如果不是,会抛出 IllegalMonitorStateException
  2. 转移节点:
  3. signal(): 从条件队列的头部取出一个节点(如果队列不为空),通过 transferForSignal(Node node) 方法将其转移到 AQS 的主同步队列尾部。转移后,该节点的 waitStatus 会被修改,表示它正在等待获取锁。
  4. signalAll(): 将条件队列中的所有节点依次转移到 AQS 的主同步队列中。
  5. 唤醒:节点被转移到主同步队列后,并不会立即被唤醒。唤醒操作通常是由持有锁的线程在释放锁时(调用 unlock() -> release() -> unparkSuccessor())触发,或者在共享模式下由 setHeadAndPropagate 触发。被转移的节点会像其他在同步队列中等待的节点一样参与锁的竞争。
sequenceDiagram
    participant T as Thread
    participant AQS
    participant CondQueue

    T->>AQS: await()
    AQS->>CondQueue: addConditionWaiter()
    AQS->>AQS: fullyRelease()
    loop 等待信号
        AQS->>T: park()
    end

    T->>AQS: signal()
    AQS->>CondQueue: transferAfterSignal()
    AQS->>SyncQueue: enq(node)
    AQS->>T: unpark()

六、 基于 AQS 构建自定义同步器实战

理解了 AQS 的原理后,我们可以尝试构建自己的同步器。关键在于重写 AQS 提供的钩子方法。

需要重写的方法(根据模式选择):

  • protected boolean tryAcquire(int arg) (独占)
  • protected boolean tryRelease(int arg) (独占)
  • protected int tryAcquireShared(int arg) (共享)
  • protected boolean tryReleaseShared(int arg) (共享)
  • protected boolean isHeldExclusively() (判断当前线程是否独占资源,Condition 需要)

6.1 实现一个不可中断、非公平可重入的独占锁 (ReentrantMutex)

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class ReentrantMutex implements Lock {

    private static class Sync extends AbstractQueuedSynchronizer {
        private Thread owner; // 当前持有锁的线程

        @Override
        protected boolean isHeldExclusively() {
            // 检查状态是否大于0,并且当前线程是否为持有者
            return getState() > 0 && owner == Thread.currentThread();
        }

        @Override
        protected boolean tryAcquire(int acquires) { // acquires 通常为1
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { // 锁未被持有
                if (compareAndSetState(0, acquires)) {
                    owner = current;
                    return true;
                }
            } else if (owner == current) { // 锁已被当前线程持有 (重入)
              //当时考虑这里要不要使用cas进行改变。但是当前线程是唯一能进入这段代码的线程;所以,此时是“线程安全的上下文”,没有竞争,不需要 CAS
                int nextc = c + acquires;
                if (nextc < 0) { // 溢出检查
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextc); // 直接设置,因为已持有锁,无需CAS
                return true;
            }
            return false; // 其他线程尝试获取已被持有的锁
        }

        @Override
        protected boolean tryRelease(int releases) { // releases 通常为1
            int c = getState() - releases;
            if (owner != Thread.currentThread()) { // 非持有者尝试释放
                throw new IllegalMonitorStateException("Current thread does not hold the lock");
            }
            boolean free = false;
            if (c == 0) { // 完全释放
                free = true;
                owner = null;
            }
            setState(c); // 直接设置
            return free; // 返回是否已完全释放
        }

        // 提供 Condition 支持
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // (可选) 提供获取持有者的方法
        final Thread getOwner() {
            return owner;
        }

        // (可选) 提供获取重入次数的方法
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1); // 不可中断获取
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    // --- Lock接口的其他方法实现 ---
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    // --- 自定义锁特有方法 ---
    public boolean isLocked() {
        return sync.getState() > 0;
    }

    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    public int getHoldCount() {
        return sync.getHoldCount();
    }
}
  • state 的含义state 在这里表示锁的重入次数。当 state == 0 时,锁未被任何线程持有。
  • owner 字段:新增一个 Thread owner 字段来记录当前持有锁的线程,用于判断重入和防止非持有者释放锁。
  • tryAcquire(int acquires)
  • 如果 state == 0(锁空闲),通过 compareAndSetState(0, acquires) 尝试获取。成功则设置 owner 为当前线程。
  • 如果 state > 0owner == Thread.currentThread()(当前线程已持有锁),则增加 statesetState(c + acquires)),实现重入。注意这里不需要 CAS,因为当前线程已独占访问权。
  • 其他情况(锁被其他线程持有)则返回 false
  • tryRelease(int releases)
  • 首先检查当前线程是否为 owner,如果不是则抛出 IllegalMonitorStateException
  • state 减去 releases
  • 如果 state 减为0,则表示锁已完全释放,将 owner 设为 null,并返回 true(这将触发 AQS 唤醒等待队列中的下一个线程)。否则返回 false(锁仍被当前线程持有)。
  • isHeldExclusively():判断 state > 0owner == Thread.currentThread()。这个方法对于 ConditionObject 的正确工作至关重要。
  • newCondition(): 直接返回 sync.newCondition(),利用 AQS 内建的 ConditionObject

这个 ReentrantMutexSimpleMutex 更完善,因为它正确处理了重入逻辑,这是实际锁中非常重要的特性。它也暴露了 lockInterruptibly, tryLockLock 接口的标准方法,这些方法直接委托给 AQS 的相应实现。

6.2 实现一个简单的计数信号量 (SimpleSemaphore)

一个简单的计数信号量可以使用共享模式:

  • state 表示当前可用的许可数量。

  • tryAcquireShared(int permits):

  • 循环 CAS 尝试减少 state

  • 如果 state 减去 permits 后小于0,则获取失败 (返回负数)。

  • 否则成功,返回 state 减去 permits 后的值 (>=0)。

  • tryReleaseShared(int permits):

  • 循环 CAS 尝试增加 state

  • 始终返回 true (假设增加总能成功)。

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class SimpleSemaphore {

    private static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits); // 初始化许可数量
        }

        // 尝试获取共享许可
        @Override
        protected int tryAcquireShared(int acquires) { // acquires 通常为需要获取的许可数
            for (;;) { // 循环 CAS
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining; // 小于0表示失败,>=0表示成功
            }
        }

        // 尝试释放共享许可
        @Override
        protected boolean tryReleaseShared(int releases) { // releases 通常为需要释放的许可数
            for (;;) { // 循环 CAS
                int current = getState();
                int next = current + releases;
                if (next < current) // 溢出检查
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true; // 释放成功
            }
        }
    }

    private final Sync sync;

    public SimpleSemaphore(int permits) {
        if (permits < 0) throw new IllegalArgumentException("permits must be non-negative.");
        sync = new Sync(permits);
    }

    // 获取一个许可 (不可中断)
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1); // 使用 AQS 的共享模式获取
    }

    // 获取指定数量的许可 (不可中断)
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException("permits must be non-negative.");
        sync.acquireSharedInterruptibly(permits);
    }

    // 释放一个许可
    public void release() {
        sync.releaseShared(1); // 使用 AQS 的共享模式释放
    }

    // 释放指定数量的许可
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException("permits must be non-negative.");
        sync.releaseShared(permits);
    }

    // 获取当前可用许可数量
    public int availablePermits() {
        return sync.getState();
    }
}

在这个 SimpleSemaphore 的实现中:

  • state 变量表示当前可用的许可数量。
  • tryAcquireShared(int acquires) 方法尝试获取指定数量的许可,通过 CAS 减少 state。如果剩余许可数量小于0,表示获取失败;否则获取成功,返回剩余许可数量。
  • tryReleaseShared(int releases) 方法尝试释放指定数量的许可,通过 CAS 增加 state。释放通常总是成功的,所以返回 true
  • acquire()release() 方法分别委托给 AQS 的 acquireSharedInterruptibly()releaseShared() 方法,利用 AQS 的排队和唤醒机制来管理等待许可的线程。