AQS

AQS 是什么?

AQS 是 Java 并发包java.util.concurrent中的 AbstractQueuedSynchronizer 类的简称。

中文名为 抽象队列同步器。

  • AQS 有什么用呢?

AQS 是用来构建 锁 和 同步器的框架,Java 中 ReentrantLockCountDownLatch等并发工具类都是基于 AQS 构建的。

AQS 的数据结构

AQS 内部使用了 volatile state 变量作为资源状态的标识。同时定义了 get/set 方法,和 CAS 替换 state的方法(compareAndSetState)。

AQS 本身实现的是线程排队和阻塞的机制。比如线程等待队列维护(获取资源失败入队/唤醒出队)。它内部使用了一个 FIFO 双端队列。使用 head/tail 标记头/尾节点。线程被存储在Node节点中。

public abstract class AbstractQueuedSynchronizer {
    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
}

Node 节点的数据结构

资源有两种共享模式,或者说两种同步模式

  • 独占模式(Exclusive): 资源只能由一个线程获取。如 ReentrantLock
  • 共享模式(Share): 资源可以同时被多个线程获取,具体资源数量可以通过参数指定。如 Semaphore、CountDownLatch。

AQS 对这两种共享模式的定义在 Node 中,我们来看一下 Node 节点的数据结构。

static final class Node {
    //标记节点 判断是否处于共享模式
    static final Node SHARED = new Node();

    //标记节点 判断是否处于独占模式
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED = 1;

    /** waitStatus value 后继节点需要被唤醒 */
    static final int SIGNAL = -1;

    /** waitStatus value 该节点在等待条件满足 */
    static final int CONDITION = -2;

    /**
     * 共享模式下 表示有资源可用 设置新head节点时,继续唤醒后继节点
     */
    static final int PROPAGATE = -3;

    //等待状态 取值范围 -3 - 1
    volatile int waitStatus;

    //前驱节点
    volatile Node prev;

    //后继节点
    volatile Node next;

    //对应的线程
    volatile Thread thread;

    //等待队列中 下一个等待节点 只有独占模式下才有效
    Node nextWaiter;

    // AQS里面的addWaiter私有方法 使用该构造函数创建 Node
    Node(Thread thread, Node mode) { // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 是否为共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
}

通过 Node 我们可以实现两个队列

  • 线程双向队列 CLH
  • nextWaiter 实现的等待线程队列(单向队列)

AQS 核心方法

AQS 核心方法是对资源的控制,包括获取和释放资源。

获取资源

独占模式下,获取资源的入口是 acquire(int arg), arg 表示要获取资源的个数,独占模式中始终为1.

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

首先通过 tryAcquire获取资源,这个方法由子类提供具体实现。

如果获取失败,调用 addWaiter就将 当前线程 加入到等待队列。

通过 CAS 来尝试加入到队尾,失败的话,进入循环,自旋 CAS 插入, 代码如下:

private Node addWaiter(Node mode) {
    // 生成该线程对应的Node节点
    Node node = new Node(Thread.currentThread(), mode);
    // 将Node插入队列中
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 使用CAS尝试,如果成功就返回
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果等待队列为空或者上述CAS失败,再自旋CAS插入
    enq(node);
    return node;
}

// 自旋CAS插入等待队列
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

我们已经通过 addWaiter将当前线程放到了等待队列队尾。而处于等待队列的节点是从头节点一个一个获取资源的。我们来看 acquireQueued

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 如果前驱节点是 head ,当前节点可以尝试去获取资源了
                if (p == head && tryAcquire(arg)) {
                    //拿到资源后,设为头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果可以休息 通过 park 进入 WAITING状态,直到被 unpark
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

节点进入等待队列后,调用 park进入 WAITING状态。只有头节点线程是活跃的。

释放资源

独占模式下,释放资源使用 release(int arg) 方法,tryRelease由子类提供实现, unpark 队列中第一个等待状态的线程。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
private void unparkSuccessor(Node node) {
    // 如果状态是负数,尝试把它设置为0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 得到头结点的后继结点head.next
    Node s = node.next;
    // 如果这个后继结点为空或者状态大于0
    // 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 等待队列中所有还有用的结点,都向前移动
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果后继结点不为空,
    if (s != null)
        LockSupport.unpark(s.thread);
}
Last Updated:
Contributors: himcs