ReadWriteLock
接口
ReadWriteLock
接口提供了读锁(共享锁)和写锁(排他锁)两种锁。读锁可以被多个读线程同时持有,而写锁一次只能被一个线程持有。ReadWriteLock
的主要实现类是ReentrantReadWriteLock
。
ReentrantReadWriteLock
类
ReentrantReadWriteLock
实现了ReadWriteLock
接口,它内部维护了两把锁:一把读锁和一把写锁。读锁和写锁都实现了Lock
接口,它们都可以像ReentrantLock
一样使用。ReentrantReadWriteLock
允许读线程并发访问,但在写线程访问时会阻塞所有读线程和写线程。
ReentrantReadWriteLock
是一个可重入的读写锁,允许多个线程同时持有读锁,但只允许一个线程持有写锁。当写锁被占用时,其他线程不能获取读锁或写锁;当读锁被占用时,其他线程可以获取读锁,但不能获取写锁。
这个类的主要特点和实现机制如下:
- 内部锁状态: 锁的状态被编码为一个整数,高16位表示共享锁(读锁)的计数,低16位表示独占锁(写锁)的计数。
- 公平性策略: 通过
FairSync
和NonfairSync
两个内部类,实现了公平锁和非公平锁两种策略。公平锁按照请求锁的顺序来分配,非公平锁允许线程插队。 - 重入性: 线程可以多次获取读锁或写锁,而不会造成死锁。锁的持有计数确保了重入。
- 锁的获取和释放:
tryAcquireShared
和tryReleaseShared
方法用于获取和释放读锁。tryAcquire
和tryRelease
方法用于获取和释放写锁。这些方法通过CAS操作来更新锁的状态。 - 条件变量: 通过
newCondition
方法创建条件变量,线程可以在条件变量上等待或唤醒。 - 锁的计数和所有权: 提供了方法来查询锁的持有计数和当前锁的持有线程。
- 线程局部存储: 使用
ThreadLocal
来存储每个线程的读锁计数,这避免了每次都需要查询线程的局部存储。 - 锁的尝试获取:
tryWriteLock
和tryReadLock
方法提供了非阻塞的锁获取方式,如果锁不可用,则立即返回false。
内部实现类
Sync
类是ReentrantReadWriteLock
实现的核心,它通过内部状态的管理,提供了对读写锁的精细控制。通过将状态分为读锁和写锁的计数,它能够同时允许多个读线程和单个写线程的安全访问。公平性和非公平性的区别在于线程在请求锁时是否应该阻塞,以及它们如何进入等待队列。Sync
类被进一步分为公平(Fair
)和非公平(Nonfair
)两个版本,这两个版本在处理锁争用时有所不同。ReentrantReadWriteLock
类的两个内部实现类:ReadLock
和WriteLock
的定义。这两个类实现了Lock
接口,用于获取和释放读锁和写锁。
Sync
内部类
内部类 Sync
是 AbstractQueuedSynchronizer
(AQS)的一个抽象子类,用于实现读写锁的核心同步机制。
锁状态表示
Sync
使用一个整数值来表示锁的状态,该整数值被逻辑上分为两部分:
- 高16位:表示共享锁(读锁)的持有计数。
- 低16位:表示排他锁(写锁)的持有计数。
通过位移和掩码操作,可以分别获取读锁和写锁的计数。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
读写锁的获取与释放
- 写锁的获取与释放:通过
tryAcquire
和tryRelease
方法实现。写锁是排他锁,一次只能由一个线程持有。
protected final boolean tryAcquire(int acquires) {
// ... 省略具体实现 ...
}
protected final boolean tryRelease(int releases) {
// ... 省略具体实现 ...
}
- 读锁的获取与释放:通过
tryAcquireShared
和tryReleaseShared
方法实现。读锁是共享锁,可以被多个线程同时持有。
protected final int tryAcquireShared(int unused) {
// ... 省略具体实现 ...
}
protected final boolean tryReleaseShared(int unused) {
// ... 省略具体实现 ...
}
公平性与非公平性
Sync
类有两个抽象方法 readerShouldBlock
和 writerShouldBlock
,分别用于确定在公平锁和非公平锁中,读线程和写线程是否应该阻塞。这些方法由子类 FairSync
和 NonfairSync
实现。
查看代码
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
锁降级
锁降级在 Sync
中是通过先获取读锁,然后释放写锁来实现的。这个操作确保了在释放写锁之后,线程仍然持有读锁,从而可以继续访问数据而不会被其他写线程干扰。
条件变量
Sync
通过 newCondition
方法支持条件变量的使用,这允许线程在锁状态下等待特定条件。
final ConditionObject newCondition() {
return new ConditionObject();
}
其他方法
Sync
还提供了其他方法来查询锁的状态,例如获取锁的持有者、读锁和写锁的计数等。
查看代码
final Thread getOwner() {
// ... 省略具体实现 ...
}
final int getReadLockCount() {
// ... 省略具体实现 ...
}
final boolean isWriteLocked() {
// ... 省略具体实现 ...
}
final int getWriteHoldCount() {
// ... 省略具体实现 ...
}
final int getReadHoldCount() {
// ... 省略具体实现 ...
}
ReadLock
类
不支持条件变量:
newCondition()
:由于读锁不支持条件变量,该方法抛出UnsupportedOperationException
。
锁的状态描述:
toString()
:返回一个描述读锁状态的字符串,包括持有的读锁数量。
tryReadLock()
流程图:
WriteLock
类
支持条件变量:
newCondition()
:返回一个新的条件变量,用于在写锁上进行等待和通知操作。
锁的状态描述:
toString()
:返回一个描述写锁状态的字符串,包括是否被锁定以及持有写锁的线程名称。
锁降级
锁降级(Lock Downgrading)是一种在多线程环境下使用同步锁时采用的技术,其目的是在获取了较高级别的锁之后,能够将其降级为较低级别的锁。
在ReentrantReadWriteLock
中,锁降级是通过内部机制来实现的,不需要用户手动管理锁的获取和释放顺序。
锁降级的过程涉及以下步骤:
- 持有写锁:线程首先必须持有写锁。
- 执行写入操作:在持有写锁的情况下,线程执行数据更新等写入操作。
- 获取读锁:在写入操作完成后,线程在释放写锁之前获取读锁。
- 释放写锁:释放写锁,此时线程持有读锁。
以下是 tryAcquireShared
和 fullTryAcquireShared
方法中与锁降级相关的部分:
查看代码
protected final int tryAcquireShared(int unused) {
// ... 省略其他代码 ...
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1; // 如果有其他线程持有写锁,则获取读锁失败
// ... 省略其他代码 ...
}
final int fullTryAcquireShared(Thread current) {
// ... 省略其他代码 ...
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1; // 如果当前线程不持有写锁,则获取读锁失败
// 如果当前线程持有写锁,则可以继续尝试获取读锁
}
// ... 省略其他代码 ...
}
锁降级的体现在于,如果当前线程已经持有了写锁(即 exclusiveCount(c) != 0
且 getExclusiveOwnerThread() == current
),那么它可以安全地继续获取读锁。这是因为在 ReentrantReadWriteLock
中,允许同一个线程在持有写锁的情况下获取读锁,而不需要先释放写锁。
在锁降级的实际操作中,以下代码段展示了如何在不释放写锁的情况下获取读锁:
查看代码
// 假设当前线程已经持有了写锁
writeLock.lock();
try {
// 执行写入操作
// ...
// 锁降级:在持有写锁的情况下获取读锁
readLock.lock();
} finally {
// 释放写锁,但仍然持有读锁
writeLock.unlock();
}
// 继续执行读取操作,此时持有读锁
try {
// ...
} finally {
// 最后释放读锁
readLock.unlock();
}
在上述代码中,线程首先获取写锁,执行写入操作,然后获取读锁,最后释放写锁。在这个过程中,线程始终保持着对资源的锁定,从而确保了数据的一致性和可见性。
总之,tryAcquireShared
和 fullTryAcquireShared
方法中的逻辑确保了持有写锁的线程可以获取读锁,这是锁降级实现的关键部分。通过这种方式,ReentrantReadWriteLock
支持了锁的降级,使得线程可以在不释放写锁的情况下增加对资源的共享度。
Sync
类中体现锁降级的方法tryAcquireShared
和fullTryAcquireShared
的源码如下源码
java@ReservedStackAccess protected final int tryAcquireShared(int unused) { // 获取当前线程 Thread current = Thread.currentThread(); // 获取当前状态 int c = getState(); // 如果写锁被其他线程持有,则返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 获取读锁的持有计数 int r = sharedCount(c); // 如果当前线程不应该阻塞,且读锁计数未达到最大值,则尝试通过CAS更新状态和计数 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 如果读锁计数为0,则设置当前线程为第一个读线程,并初始化读锁持有计数 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 如果当前线程已经是第一个读线程,则增加读锁持有计数 firstReaderHoldCount++; } else { // 如果当前线程不是第一个读线程,则获取或创建HoldCounter对象,并增加读锁持有计数 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } // 返回1表示成功获取读锁 return 1; } // 如果上述尝试失败,则返回fullTryAcquireShared方法的返回值 return fullTryAcquireShared(current); }
javafinal int fullTryAcquireShared(Thread current) { // 初始化HoldCounter对象 HoldCounter rh = null; // 进入无限循环,直到成功获取读锁 for (;;) { // 获取当前状态 int c = getState(); // 如果写锁被其他线程持有,且持有者不是当前线程,则返回-1 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // 如果当前线程持有写锁,则阻塞会导致死锁,所以直接返回-1 // 注意,这行代码永远不会执行,因为前面的条件判断已经排除了这种情况 } else if (readerShouldBlock()) { // 确保当前线程不是重入获取读锁 if (firstReader == current) { // 确保第一个读线程的读锁持有计数大于0 } else { // 如果当前线程不是第一个读线程,则获取或创建HoldCounter对象,并检查计数是否为0 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } // 如果读锁计数达到最大值,则抛出错误 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 使用CAS尝试更新状态和计数 if (compareAndSetState(c, c + SHARED_UNIT)) { // 如果读锁计数为0,则设置当前线程为第一个读线程,并初始化读锁持有计数 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 如果当前线程已经是 firstReaderHoldCount++; } else { // 如果当前线程不是第一个读线程,则获取或创建HoldCounter对象,并增加读锁持有计数 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } // 返回1表示成功获取读锁 return 1; } } // 循环尝试获取读锁,直到成功或失败 } }