AbstractQueuedSynchronizer
(AQS)是一个用于构建锁和同步器的框架,它提供了一种实现阻塞和条件等待的方法。基于AQS,JDK实现了各种同步器,如ReentrantLock
、Semaphore
、CountDownLatch
等。
AQS的设计哲学是使用队列来管理等待的线程,并通过一个内部类Node
来表示等待中的线程。
AQS的核心思想是使用一个双向链表来管理等待的线程,这个链表被称为等待队列。每个节点Node
在链表中都有自己的位置,并且每个节点都包含了一个线程引用、一个等待状态和一个分别指向前后一个节点的引用。
以下是AQS的一些关键特性:
- 独占模式(Exclusive):当锁处于独占模式时,只有一个线程可以获取锁。AQS通过
Node
中的waitStatus
字段来表示线程的状态,如CANCELLED
表示线程已经取消了等待。 - 共享模式(Shared):当锁处于共享模式时,多个线程可以同时获取锁。AQS通过
Node
中的prev
和next
字段来表示线程的顺序,从而实现了先进先出(FIFO)的等待队列。 - 获取和释放锁:AQS提供了
acquire
和release
方法,用于线程获取和释放锁。这些方法内部通过tryAcquire
和tryRelease
方法来处理具体的锁获取和释放逻辑。 - 条件等待和通知:AQS提供了
Condition
接口,用于线程间的条件等待和通知。每个Condition
对象都关联着一个等待队列,当线程调用await
方法时,它会加入到等待队列中;当其他线程调用signal
方法时,等待队列中的线程会被唤醒并重新尝试获取锁。 - 公平锁和非公平锁:AQS支持公平锁和非公平锁。公平锁保证了等待时间最长的线程能够先获取锁,而非公平锁则允许线程根据其优先级和等待时间来获取锁。
- 阻塞和唤醒线程:AQS提供了
acquireInterruptibly
和release
方法,用于阻塞和唤醒线程。当线程在等待锁时,如果它被中断,它会立即释放锁并抛出InterruptedException
。
内部类Node
Node
类的定义在 AQS 中,是 AQS 同步队列和条件队列的基础,用于表示在同步器上等待的线程。每个 Node
包含线程的等待状态和队列中的前后节点引用。
Node的结构
属性:
volatile Node prev;
: 前一个节点的引用,用于维护队列的链接。volatile
关键字确保了多线程环境下的内存可见性。volatile Node next;
: 下一个节点的引用,同样用于维护队列的链接。Thread waiter;
: 等待中的线程。volatile int status;
: 表示节点的状态,如等待、取消、条件等待等。WAITING:这个状态位表示节点正在等待。
CANCELLED:这个状态位表示节点已经被取消。
COND:这个状态位表示节点目前在条件队列中等待。
方法:
casPrev(Node c, Node v)
: 使用比较并交换(CAS)操作来设置前一个节点。casNext(Node c, Node v)
: 使用 CAS 操作来设置下一个节点。getAndUnsetStatus(int v)
: 原子地清除状态位。setPrevRelaxed(Node p)
: 不使用 CAS 的方式设置前一个节点,通常用于节点不在队列中的情况。setStatusRelaxed(int s)
: 不使用 CAS 的方式设置状态,通常用于节点不在队列中的情况。clearStatus()
: 清除状态字段,用于减少不必要的信号通知。
静态字段:
STATUS
,NEXT
,PREV
: 分别代表status
,next
,prev
字段的偏移量,用于在底层进行原子操作。
子类:
ExclusiveNode
: 表示独占模式的节点,用于例如ReentrantLock
。SharedNode
: 表示共享模式的节点,用于例如Semaphore
和CountDownLatch
。ConditionNode
: 表示条件队列中的节点,它还实现了ForkJoinPool.ManagedBlocker
接口,允许条件等待在ForkJoinPool
中使用。
Node的具体实现
ExclusiveNode
、SharedNode
和 ConditionNode
是三个具体的 Node
子类,它们分别用于表示独占锁、共享锁和条件队列中的节点。这些类在 AQS 中用于实现不同的同步语义。
ExclusiveNode
继承自Node
,是一个具体实现类,用于表示独占模式下的节点。在独占锁(如ReentrantLock
)中,每次只能有一个线程持有锁,因此节点是独占的。SharedNode
同样继承自Node
,用于表示共享模式下的节点。在共享锁(如ReentrantReadWriteLock
的读锁)中,多个线程可以同时获取锁。ConditionNode
继承自Node
并实现了ForkJoinPool.ManagedBlocker
接口,用于支持条件队列。条件队列是锁的一个附属结构,它允许线程在某个条件不满足时挂起,并在条件满足时被唤醒。ConditionNode nextWaiter;
:指向条件队列中下一个等待节点的引用。isReleasable
:返回当前线程是否可以被释放,即等待条件是否满足或者线程是否被中断。block
:实现ManagedBlocker
接口的方法,用于在条件队列中挂起当前线程,直到它被唤醒或中断。
AQS类属性
head: 表示同步队列的头部节点。
tail:表示同步队列的尾部节点。与 head
类似,tail
也是 volatile
类型的,确保了内存可见性。尾部节点在队列初始化后,只通过 casTail
方法(一个原子性的比较并交换操作)来修改。当新的节点加入队列时,它们会被添加到队列的尾部。
state: 这个属性表示同步器的状态。
AQS重要方法
acquire
方法
这个方法是 AQS 中所有公开的 acquire
方法调用的入口点,用于尝试获取同步状态。
查看代码
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
参数说明:
node
: 当前线程对应的节点。如果是重新获取条件的线程,node
可能为null
。arg
: 要获取的同步状态的数量。shared
: 如果为true
,表示获取共享锁;如果为false
,表示获取独占锁。interruptible
: 如果为true
,表示在等待过程中可以被中断;如果为false
,表示等待过程中不能被中断。timed
: 如果为true
,表示使用定时等待;如果为false
,表示使用非定时等待。time
: 如果使用定时等待,表示超时时间。
初始化变量:
current
存储当前线程。spins
和postSpins
用于控制自旋次数。interrupted
用于标记线程是否被中断。first
用于标记节点是否为队列中的第一个节点。pred
用于存储前驱节点。
无限循环:
acquire
方法使用了一个无限循环来处理锁的获取。线程会一直尝试获取锁,直到成功、被中断或超时。
检查队列状态:
- 如果节点不是队列中的第一个节点,且前驱节点存在,会检查前驱节点的状态。如果前驱节点被取消了,会清理队列。如果前驱节点的状态正常,会进行自旋等待。
尝试获取锁:
- 如果节点是队列中的第一个节点或者还没有入队,会尝试获取锁。这通过调用
tryAcquire
或tryAcquireShared
方法来完成,取决于锁是共享模式还是独占模式。
创建节点:
- 如果节点尚未创建,会根据锁的模式创建一个新的节点。
入队:
- 如果节点尚未入队,会尝试将节点加入队列的尾部。这涉及到设置节点的
waiter
字段和更新队列的tail
指针。
自旋和等待:
- 如果节点是队列中的第一个节点,会进行自旋等待。这是为了减少线程在等待锁时的不公平性。
- 如果节点的状态是
WAITING
,会清除状态并重新尝试获取锁。 - 如果线程被唤醒或者超时,会根据情况设置自旋次数并重新尝试获取锁。
响应中断和超时:
- 如果线程在等待过程中被中断,会根据
interruptible
标志来决定是否抛出InterruptedException
。 - 如果是支持超时的锁请求,线程会在指定的时间内等待锁。如果等待超时,线程会退出循环并返回。
取消等待:
- 如果线程在等待过程中被其他线程唤醒或者等待超时,会尝试再次获取锁。如果成功,线程会从队列中移除并继续执行;如果失败,会根据中断状态和是否可取消来决定是否抛出异常或返回。
返回结果:
- 如果成功获取锁,返回1;如果超时,返回0;如果被中断,返回-1。
tryAcquire
方法
tryAcquire
方法用于尝试以独占模式获取同步状态,它允许子类实现自己的同步逻辑。
方法用途:
tryAcquire
方法允许子类以独占模式尝试获取同步状态。独占模式只允许一个线程持有锁。
参数说明:
arg
: 要获取的同步状态的数量。这个值是传递给acquire
或tryAcquire
方法的参数,也可以是在进入条件等待时保存的值。
默认实现:
- 默认实现抛出一个
UnsupportedOperationException
,因为独占模式是可选的,不是所有同步器都支持。
同步状态管理:
- 如果子类覆盖了这个方法,它需要确保同步器处于一个合法的状态。
- 子类必须以一种一致的方式抛出
IllegalMonitorStateException
,以确保同步的正确工作。
线程排队:
- 如果
tryAcquire
方法返回false
,表示获取失败,此时acquire
方法可能会将线程加入到等待队列中。 - 如果
tryAcquire
方法返回true
,表示获取成功,线程可以继续执行。
tryAcquireShared
方法
tryAcquireShared
方法用于尝试以共享模式获取同步状态,它允许子类实现自己的同步逻辑。与 tryAcquire
方法类似,区别在于下面三种:
- 获取模式:
tryAcquireShared
用于共享模式,tryAcquire
用于独占模式。 - 返回值:
tryAcquireShared
返回三个不同的值,而tryAcquire
只返回true
或false
。 - 适用场景:
tryAcquireShared
适用于允许多个线程同时访问的场景,如Semaphore
或ReadWriteLock
, 而tryAcquire
适用于只有一个线程可以访问的场景,如ReentrantLock
。
release
方法
release
方法用于在独占模式下释放同步状态。
方法用途:
release
方法用于在独占模式下释放同步状态。当持有锁的线程完成其工作并释放锁时,会调用这个方法。
默认实现:
- 默认实现直接调用
tryRelease
方法,并尝试释放锁。如果tryRelease
返回true
,表示至少有一个线程可以被释放,此时会调用signalNext(head)
方法来唤醒队列中的下一个节点。
线程排队:
- 如果
tryRelease
返回true
,表示可以释放至少一个线程。此时,会调用signalNext(head)
方法来唤醒队列中的下一个节点。 - 如果
tryRelease
返回false
,表示没有线程可以被释放,此时不会做任何事情。
signalNext
静态方法
signalNext
方法,用于唤醒等待队列中的下一个节点。
方法用途:
signalNext
方法用于唤醒等待队列中的下一个节点。当持有锁的线程完成其工作并释放锁时,会调用这个方法来唤醒等待队列中的下一个节点。
方法逻辑:
- 首先检查头节点
h
是否为null
,如果为null
,则不进行任何操作。 - 如果头节点不为
null
,则检查头节点的下一个节点s
是否为null
,如果为null
,则不进行任何操作。 - 如果下一个节点不为
null
,并且其状态不是WAITING
(即节点没有被取消),则尝试唤醒这个节点。 - 唤醒节点的步骤包括:
- 原子性地清除节点状态,使其不再处于等待状态。
- 使用
LockSupport.unpark(s.waiter)
方法来唤醒节点的等待线程。