Sync
内部类
CountDownLatch
的同步机制是通过其内部静态类Sync
实现的,它继承了AQS。Sync
类使用AQS的状态来表示计数器的值。
查看代码
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
主要方法
Sync(int count)
: 构造函数,初始化AQS的状态为指定的计数值。int getCount()
: 获取当前AQS的状态,即计数器的值。protected int tryAcquireShared(int acquires)
: 尝试获取共享锁。如果计数器为0,返回1表示获取成功;否则返回-1表示获取失败。protected boolean tryReleaseShared(int releases)
: 尝试释放共享锁。递减计数器的值,并在计数器到达0时返回true,表示可以唤醒所有等待的线程。
主要方法
await
方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
await()
方法其实调用的是一个来自其内部类Sync
的acquireSharedInterruptibly()
方法,这个方法是在AbstractQueuedSynchronizer
(AQS)中定义的。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}
这段代码是AQS中的acquireSharedInterruptibly()
方法,它首先检查当前线程是否已经被中断。如果已经被中断,则抛出InterruptedException
。如果没有被中断,它调用tryAcquireShared()
方法尝试获取共享锁。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
这段代码是CountDownLatch
内部类Sync
中的tryAcquireShared()
方法。它检查AQS的状态(即计数器的值)是否为0。如果为0,返回1表示获取共享锁成功;如果不为0,返回-1表示获取失败。
acquire
方法在介绍ReentrantLock
时有详细讲解。
总的来说,执行流程如下:
- 检查中断状态:
await()
方法开始时,首先检查当前线程是否已经被中断。如果是,则抛出InterruptedException
。 - 尝试获取共享锁:如果线程没有被中断,
await()
方法会调用tryAcquireShared()
方法尝试获取共享锁。这会检查计数器是否为0。 - 获取成功:如果计数器为0,
tryAcquireShared()
返回1,表示获取共享锁成功,线程可以继续执行。 - 获取失败:如果计数器大于0,
tryAcquireShared()
返回-1,表示获取共享锁失败。这时,线程会被阻塞,并放入AQS的队列中等待。 - 等待唤醒:线程会一直处于等待状态,直到其他线程调用
countDown()
方法将计数器减到0,或者线程被中断。 - 唤醒后重新尝试获取锁:当线程被唤醒后,它会重新尝试获取共享锁。如果计数器为0,则获取成功,线程继续执行;如果计数器仍然大于0,线程会再次被阻塞。
- 响应中断:在整个等待过程中,如果线程被中断,它会抛出
InterruptedException
。
await(timeout,unit)
方法
await
和`await(long timeout, TimeUnit unit)方法类似,是两种等待计数器到达零的方法。它们的主要区别在于等待时间的限制和返回值。
await
方法会无限期等待,而await(long timeout, TimeUnit unit)
方法有一个明确的等待时间限制。await
方法在等待线程被中断时会抛出异常,而await(long timeout, TimeUnit unit)
方法在等待时间超时时会返回false
。await
方法没有返回值,而await(long timeout, TimeUnit unit)
方法会返回一个布尔值,表示是否成功等待计数器到达零。
countDown
方法
CountDownLatch
用于递减计数器的值,并在计数器到达0时唤醒所有等待的线程。
1. 调用countDown方法
外部代码通过调用countDownLatch.countDown()
来减少计数器的值。这个调用实际上是委托给内部的sync
对象来处理的。
public void countDown() {
sync.releaseShared(1);
}
2. 尝试释放共享锁
countDown
方法内部调用了sync.releaseShared(1)
,这会尝试减少计数器的值。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
signalNext(head);
return true;
}
return false;
}
3. 尝试更新状态
在releaseShared
方法中,首先调用tryReleaseShared(arg)
来尝试更新状态。如果更新成功,并且计数器变为零,那么它将唤醒所有等待的线程。
查看代码
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
4. 循环以确保状态更新
tryReleaseShared
方法使用一个无限循环来确保状态更新的原子性。它首先获取当前状态值c
,然后尝试将状态值减少1得到nextc
。
5. 原子操作更新状态
使用compareAndSetState(c, nextc)
方法尝试原子地将状态从c
更新到nextc
。这个方法是AQS中的原子操作,只有当当前状态确实是c
时,状态才会被更新为nextc
。
6. 检查是否需要唤醒线程
如果状态更新成功,tryReleaseShared
方法会检查更新后的状态nextc
是否为0。如果为0,表示所有的计数都已经完成,此时返回true
。
返回true
将执行signalNext
方法。
signalNext
方法的执行流程:
- 获取头节点的后继节点:方法首先检查传入的头节点
h
是否为null
,如果不是,则获取头节点的下一个节点s
。 - 检查后继节点的状态:如果后继节点
s
不为null
且其状态s.status
不为0,这通常表示节点处于等待状态(例如,WAITING
状态)。 - 更新后继节点的状态:调用
s.getAndUnsetStatus(WAITING)
来原子地更新后继节点的状态。这个方法可能是用来将节点的状态从等待状态更新为其他状态,以表示线程不再等待。 - 唤醒后继节点的线程:调用
LockSupport.unpark(s.waiter)
来唤醒后继节点s
中保存的线程waiter
。这会使得该线程的park
调用返回,这样线程就可以继续执行了。