Sync内部类
CountDownLatch的同步机制是通过其内部静态类Sync实现的,它继承了AQS。Sync类使用AQS的状态来表示计数器的值。

查看代码
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}主要方法
Sync(int count): 构造函数,初始化AQS的状态为指定的计数值。int getCount(): 获取当前AQS的状态,即计数器的值。protected int tryAcquireShared(int acquires): 尝试获取共享锁。如果计数器为0,返回1表示获取成功;否则返回-1表示获取失败。protected boolean tryReleaseShared(int releases): 尝试释放共享锁。递减计数器的值,并在计数器到达0时返回true,表示可以唤醒所有等待的线程。
主要方法
await方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}await()方法其实调用的是一个来自其内部类Sync的acquireSharedInterruptibly()方法,这个方法是在AbstractQueuedSynchronizer(AQS)中定义的。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}这段代码是AQS中的acquireSharedInterruptibly()方法,它首先检查当前线程是否已经被中断。如果已经被中断,则抛出InterruptedException。如果没有被中断,它调用tryAcquireShared()方法尝试获取共享锁。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}这段代码是CountDownLatch内部类Sync中的tryAcquireShared()方法。它检查AQS的状态(即计数器的值)是否为0。如果为0,返回1表示获取共享锁成功;如果不为0,返回-1表示获取失败。
acquire方法在介绍ReentrantLock时有详细讲解。
总的来说,执行流程如下:
- 检查中断状态:
await()方法开始时,首先检查当前线程是否已经被中断。如果是,则抛出InterruptedException。 - 尝试获取共享锁:如果线程没有被中断,
await()方法会调用tryAcquireShared()方法尝试获取共享锁。这会检查计数器是否为0。 - 获取成功:如果计数器为0,
tryAcquireShared()返回1,表示获取共享锁成功,线程可以继续执行。 - 获取失败:如果计数器大于0,
tryAcquireShared()返回-1,表示获取共享锁失败。这时,线程会被阻塞,并放入AQS的队列中等待。 - 等待唤醒:线程会一直处于等待状态,直到其他线程调用
countDown()方法将计数器减到0,或者线程被中断。 - 唤醒后重新尝试获取锁:当线程被唤醒后,它会重新尝试获取共享锁。如果计数器为0,则获取成功,线程继续执行;如果计数器仍然大于0,线程会再次被阻塞。
- 响应中断:在整个等待过程中,如果线程被中断,它会抛出
InterruptedException。
await(timeout,unit)方法
await和`await(long timeout, TimeUnit unit)方法类似,是两种等待计数器到达零的方法。它们的主要区别在于等待时间的限制和返回值。
await方法会无限期等待,而await(long timeout, TimeUnit unit)方法有一个明确的等待时间限制。await方法在等待线程被中断时会抛出异常,而await(long timeout, TimeUnit unit)方法在等待时间超时时会返回false。await方法没有返回值,而await(long timeout, TimeUnit unit)方法会返回一个布尔值,表示是否成功等待计数器到达零。
countDown方法
CountDownLatch用于递减计数器的值,并在计数器到达0时唤醒所有等待的线程。
1. 调用countDown方法
外部代码通过调用countDownLatch.countDown()来减少计数器的值。这个调用实际上是委托给内部的sync对象来处理的。
public void countDown() {
sync.releaseShared(1);
}2. 尝试释放共享锁
countDown方法内部调用了sync.releaseShared(1),这会尝试减少计数器的值。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
signalNext(head);
return true;
}
return false;
}3. 尝试更新状态
在releaseShared方法中,首先调用tryReleaseShared(arg)来尝试更新状态。如果更新成功,并且计数器变为零,那么它将唤醒所有等待的线程。
查看代码
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}4. 循环以确保状态更新
tryReleaseShared方法使用一个无限循环来确保状态更新的原子性。它首先获取当前状态值c,然后尝试将状态值减少1得到nextc。
5. 原子操作更新状态
使用compareAndSetState(c, nextc)方法尝试原子地将状态从c更新到nextc。这个方法是AQS中的原子操作,只有当当前状态确实是c时,状态才会被更新为nextc。
6. 检查是否需要唤醒线程
如果状态更新成功,tryReleaseShared方法会检查更新后的状态nextc是否为0。如果为0,表示所有的计数都已经完成,此时返回true。
返回true将执行signalNext方法。
signalNext方法的执行流程:
- 获取头节点的后继节点:方法首先检查传入的头节点
h是否为null,如果不是,则获取头节点的下一个节点s。 - 检查后继节点的状态:如果后继节点
s不为null且其状态s.status不为0,这通常表示节点处于等待状态(例如,WAITING状态)。 - 更新后继节点的状态:调用
s.getAndUnsetStatus(WAITING)来原子地更新后继节点的状态。这个方法可能是用来将节点的状态从等待状态更新为其他状态,以表示线程不再等待。 - 唤醒后继节点的线程:调用
LockSupport.unpark(s.waiter)来唤醒后继节点s中保存的线程waiter。这会使得该线程的park调用返回,这样线程就可以继续执行了。
