AQS
AQS 是什么?
AQS 是 Java 并发包java.util.concurrent
中的 AbstractQueuedSynchronizer
类的简称。
中文名为 抽象队列同步器。
- AQS 有什么用呢?
AQS 是用来构建 锁 和 同步器的框架,Java 中 ReentrantLock
,CountDownLatch
等并发工具类都是基于 AQS 构建的。
AQS 的数据结构
AQS 内部使用了 volatile state
变量作为资源状态的标识。同时定义了 get/set 方法,和 CAS 替换 state的方法(compareAndSetState)。
AQS 本身实现的是线程排队和阻塞的机制。比如线程等待队列维护(获取资源失败入队/唤醒出队)。它内部使用了一个 FIFO 双端队列。使用 head/tail 标记头/尾节点。线程被存储在Node节点中。
public abstract class AbstractQueuedSynchronizer {
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
Node 节点的数据结构
资源有两种共享模式,或者说两种同步模式
- 独占模式(Exclusive): 资源只能由一个线程获取。如 ReentrantLock
- 共享模式(Share): 资源可以同时被多个线程获取,具体资源数量可以通过参数指定。如 Semaphore、CountDownLatch。
AQS 对这两种共享模式的定义在 Node 中,我们来看一下 Node 节点的数据结构。
static final class Node {
//标记节点 判断是否处于共享模式
static final Node SHARED = new Node();
//标记节点 判断是否处于独占模式
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value 后继节点需要被唤醒 */
static final int SIGNAL = -1;
/** waitStatus value 该节点在等待条件满足 */
static final int CONDITION = -2;
/**
* 共享模式下 表示有资源可用 设置新head节点时,继续唤醒后继节点
*/
static final int PROPAGATE = -3;
//等待状态 取值范围 -3 - 1
volatile int waitStatus;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//对应的线程
volatile Thread thread;
//等待队列中 下一个等待节点 只有独占模式下才有效
Node nextWaiter;
// AQS里面的addWaiter私有方法 使用该构造函数创建 Node
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
}
通过 Node 我们可以实现两个队列
- 线程双向队列 CLH
- nextWaiter 实现的等待线程队列(单向队列)
AQS 核心方法
AQS 核心方法是对资源的控制,包括获取和释放资源。
获取资源
独占模式下,获取资源的入口是 acquire(int arg)
, arg
表示要获取资源的个数,独占模式中始终为1.
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先通过 tryAcquire
获取资源,这个方法由子类提供具体实现。
如果获取失败,调用 addWaiter
就将 当前线程 加入到等待队列。
通过 CAS 来尝试加入到队尾,失败的话,进入循环,自旋 CAS 插入, 代码如下:
private Node addWaiter(Node mode) {
// 生成该线程对应的Node节点
Node node = new Node(Thread.currentThread(), mode);
// 将Node插入队列中
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 使用CAS尝试,如果成功就返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果等待队列为空或者上述CAS失败,再自旋CAS插入
enq(node);
return node;
}
// 自旋CAS插入等待队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
我们已经通过 addWaiter
将当前线程放到了等待队列队尾。而处于等待队列的节点是从头节点一个一个获取资源的。我们来看 acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果前驱节点是 head ,当前节点可以尝试去获取资源了
if (p == head && tryAcquire(arg)) {
//拿到资源后,设为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果可以休息 通过 park 进入 WAITING状态,直到被 unpark
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
节点进入等待队列后,调用 park
进入 WAITING
状态。只有头节点线程是活跃的。
释放资源
独占模式下,释放资源使用 release(int arg)
方法,tryRelease
由子类提供实现, unpark
队列中第一个等待状态的线程。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 如果状态是负数,尝试把它设置为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 得到头结点的后继结点head.next
Node s = node.next;
// 如果这个后继结点为空或者状态大于0
// 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 等待队列中所有还有用的结点,都向前移动
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果后继结点不为空,
if (s != null)
LockSupport.unpark(s.thread);
}