ReentrantLock
是一个实现了Lock
接口的类,它是一个可重入锁,意味着同一个线程可以多次获取同一把锁而不会发生死锁。
ReentrantLock
实现
ReentrantLock
包含三个内部类:基于AQS(AbstractQueuedSynchronizer)实现的Sync
抽象类,以及继承自Sync
的NonfairSync
和FairSync
类。
ReentrantLock
通过一个final Sync
属性来管理锁的获取和释放,这个属性实例化为NonfairSync
或FairSync
,以支持非公平锁或公平锁的行为。
Sync
抽象类
Sync
类是 ReentrantLock
的核心,它通过 AQS 提供了锁的基本功能。Sync
定义了如何尝试获取和释放锁,以及如何处理重入、公平性和中断等复杂情况。
方法名称 | 描述 |
---|---|
tryLock() | 尝试以非公平的方式获取锁。如果锁未被占用或当前线程已经持有锁,则获取锁并返回true ;否则返回false 。 |
initialTryLock() | 抽象方法,由子类实现,用于在尝试获取锁之前进行初步尝试。这个方法的具体实现取决于锁是公平还是非公平。 |
lock() | 获取锁,如果锁不可用,则当前线程将等待直到锁可用。先尝试立即获取锁,如果失败则调用 acquire(1) 进行排队。 |
lockInterruptibly() | 获取锁,但如果当前线程被中断,则抛出 InterruptedException 。先检查中断状态,然后尝试立即获取锁,如果失败则调用 acquireInterruptibly(1) 。 |
tryLockNanos(long nanos) | 尝试获取锁,最多等待指定的纳秒数。如果超时或当前线程被中断,则返回false 。先检查中断状态,然后尝试立即获取锁,如果失败则调用 tryAcquireNanos(1, nanos) 。 |
tryRelease(int releases) | 尝试释放锁。减少状态计数器,如果当前线程不是锁的持有者则抛出 IllegalMonitorStateException 。如果状态计数器减少到0,则清空独占锁的所有者,并返回true 表示完全释放了锁。 |
isHeldExclusively() | 判断当前线程是否独占地持有锁。检查当前线程是否是独占锁的所有者。 |
newCondition() | 创建一个新的 ConditionObject 实例,该实例与当前锁关联。 |
getOwner() | 返回当前持有锁的线程,如果没有线程持有锁,则返回null 。 |
getHoldCount() | 返回当前线程对锁的重入次数,如果不是当前线程持有锁,则返回0。 |
isLocked() | 判断锁是否被任何线程持有。检查状态是否不为0。 |
readObject(ObjectInputStream s) | 在反序列化过程中,将状态重置为未锁定状态(0)。 |
Sync
实现类
ReentrantLock
通过继承Sync
抽象类,提供了公平锁和非公平锁两种模式,默认是非公平锁。
创建方法:
ReentrantLock fairLock = new ReentrantLock(true); // 公平锁
ReentrantLock nonFairLock = new ReentrantLock(); // 默认是非公平锁
ReentrantLock nonFairLockExplicit = new ReentrantLock(false);
公平锁
当 ReentrantLock
被创建为公平锁时,线程尝试获取锁的顺序是按照请求锁的顺序来的,即先来先得。这意味着如果一个线程长时间持有一个锁,那么等待队列中的线程将按照它们请求锁的顺序获得锁。这有助于防止线程饥饿,即某些线程长时间得不到锁的情况。
initialTryLock
方法尝试在没有任何先入队线程!hasQueuedThreads()
的情况下获取锁。它首先尝试通过 compareAndSetState
方法将同步状态从 0
更新为 1
。如果成功,表示锁未被其他线程持有,当前线程将设置为自己作为独占所有者线程。如果锁已经被其他线程持有,那么会检查是否是重入锁(即持有锁的线程是当前线程),如果是,则增加锁的计数。
如果 getState()
返回 0
(表示锁当前可用),并且 compareAndSetState
成功,那么线程将获取锁。如果不是重入锁,或者锁已经被其他线程持有,那么获取锁将失败。
非公平锁
非公平锁和公平锁在initialTryLock()
代码上区别是没有使用下面这个方法,在tryAcquire()
方法没有!hasQueuedPredecessors()
。
非公平锁是 ReentrantLock
的默认模式。在非公平锁模式下,线程获取锁的顺序是不一定的,新来的线程有可能立即获得锁,即使等待队列中已经有其他线程在等待。这种模式下,系统开销较小,因为线程不必严格按照请求顺序来获得锁,这可能导致更高的吞吐量和性能。
ReentrantLock
各方法执行流程
lock
执行流程
1. lock()
方法调用
当我们调用ReentrantLock
的lock()
方法时,这个方法委托给sync
对象的lock
方法。
public void lock() {
sync.lock();
}
2. sync.lock()
尝试首次非守护锁获取
在sync
对象的lock
方法中,首先尝试一次无条件的锁获取,即initialTryLock()
。
final void lock() {
if (!initialTryLock())
acquire(1);
}
3. initialTryLock()
方法
initialTryLock()
方法尝试使用compareAndSetState
以原子方式将锁状态从0变为1。如果成功,表示获取到了锁,并记录当前线程为锁的持有者。如果锁已经被持有,但持有者是当前线程,那么它将增加锁的计数(重入锁),并成功返回。如果以上两种情况都不满足,则返回失败。
查看代码
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // 首次尝试是无条件的
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;
if (c < 0) // 溢出
throw new Error("超过最大锁计数");
setState(c);
return true;
} else
return false;
}
4. acquire(int arg)
方法
如果initialTryLock()
失败,那么会调用acquire
方法,这个方法会尝试通过tryAcquire
再次获取锁。如果失败,将执行更复杂的获取逻辑。
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
5. tryAcquire(int acquires)
方法
这个方法再次尝试设置锁状态。如果锁状态为0,表示锁未被占用,那么尝试设置状态并设置当前线程为锁的持有者。
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
6. acquire(Node node, int arg, ...)
方法
如果tryAcquire
再次失败,那么会执行这个方法,这个方法包含了获取锁的复杂逻辑,包括线程的排队、等待、唤醒等。这是一个循环过程,线程可能会经历多次尝试,直到获取到锁或者被中断。
查看代码
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
初始化变量:
current
:获取当前线程。spins
和postSpins
:用于控制自旋次数,以减少在重新竞争时的不公平性。interrupted
:标记线程是否被中断。first
:标记节点是否为队列中的第一个节点。pred
:用于存储前一个节点。
无限循环尝试获取锁:
- 循环的目的是不断尝试获取锁,直到成功或者线程被中断。
检查节点是否为队列中的第一个节点:
- 如果节点不是第一个节点,且前一个节点(
pred
)存在,更新first
标志。 - 如果前一个节点被取消了,清理队列并继续循环。
- 如果前一个节点的上一个节点为空(说明前一个节点可能是头节点),执行自旋等待。
尝试获取锁:
- 如果节点是第一个节点或者还没有入队,尝试通过
tryAcquire(arg)
获取锁。 - 如果获取成功,更新队列头部,清除等待状态,并返回。
- 如果获取失败,继续执行。
创建节点:
- 如果节点还没有创建,根据是否是共享模式创建相应的节点。
尝试将节点加入队列:
- 如果节点还没有入队,尝试将节点加入队列尾部。
- 如果尾部节点为空,尝试初始化头节点。
- 如果通过CAS操作成功将节点加入队列,更新队列的尾部指针。
处理自旋和等待状态:
- 如果节点是第一个节点且有剩余的自旋次数,执行自旋等待。
- 如果节点的状态为0(未设置等待状态),将其设置为等待状态并重新尝试获取锁。
线程等待:
- 如果不在定时模式下,使用
LockSupport.park()
挂起当前线程。 - 如果在定时模式下,计算剩余时间,并使用
LockSupport.parkNanos()
挂起线程直到超时或被唤醒。 - 如果线程被中断且中断标志位为true,退出循环。
取消获取:
- 如果线程被中断或者超时,调用
cancelAcquire(node, interrupted, interruptible)
取消获取操作。
返回结果:
- 如果线程成功获取了锁,返回1。
- 如果线程被中断或者超时,返回相应的错误码。
unlock
执行流程
1. unlock方法
unlock
方法是ReentrantLock
对外提供的一个释放锁的方法。
public void unlock() {
sync.release(1);
}
这个方法调用了一个sync.release(1)
,这里的sync
是ReentrantLock
的一个内部类,实现了AQS
(AbstractQueuedSynchronizer)。
2. release方法
release
方法定义在AQS
中,用于释放锁资源。
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
这个方法首先调用tryRelease
尝试释放锁,如果释放成功,则调用signalNext
方法唤醒后继节点。
3. tryRelease方法
tryRelease
方法是ReentrantLock
内部类Sync
中重写的AQS
的方法,用于尝试释放锁。
查看代码
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}
这个方法首先获取当前锁的状态c
,然后判断当前线程是否为持有锁的线程,如果不是,则抛出异常。如果当前线程是持有锁的线程,则减少锁的计数,如果锁的计数变为0,表示锁被完全释放,将持有锁的线程设置为null,并更新锁的状态。
4. signalNext方法
signalNext
方法是AQS
中用于唤醒后继节点的方法。
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);
LockSupport.unpark(s.waiter);
}
}
这个方法首先获取头节点的后继节点,如果后继节点不为null,并且状态不是0(表示节点处于等待状态),则调用getAndUnsetStatus
方法更新节点状态,并调用LockSupport.unpark
方法唤醒节点中的线程。
5. getAndUnsetStatus方法
getAndUnsetStatus
方法是AQS
中用于更新节点状态的方法。
final int getAndUnsetStatus(int v) { // for signalling
return U.getAndBitwiseAndInt(this, STATUS, ~v);
}
这个方法使用CAS
操作更新节点的状态。
总结
ReentrantLock
的unlock
方法执行流程如下:
- 调用
sync.release(1)
,其中sync
是ReentrantLock
内部类,实现了AQS
。 - 在
release
方法中,首先调用tryRelease
尝试释放锁。 - 在
tryRelease
方法中,首先获取当前锁的状态,然后判断当前线程是否为持有锁的线程,如果不是,则抛出异常。如果当前线程是持有锁的线程,则减少锁的计数,如果锁的计数变为0,表示锁被完全释放,将持有锁的线程设置为null,并更新锁的状态。 - 如果锁释放成功,调用
signalNext
方法唤醒后继节点。 - 在
signalNext
方法中,获取头节点的后继节点,如果后继节点不为null,并且状态不是0(表示节点处于等待状态),则调用getAndUnsetStatus
方法更新节点状态,并调用LockSupport.unpark
方法唤醒节点中的线程。 - 在
getAndUnsetStatus
方法中,使用CAS
操作更新节点的状态。
tryLock
执行流程
tryLock
方法是 ReentrantLock
类中的一个方法,它尝试获取锁,如果锁没有被其他线程持有,则获取成功并返回 true
;如果锁已经被其他线程持有,则获取失败并返回 false
。这个方法不会导致线程阻塞,适用于需要尝试获取锁而不是一直等待的场景。
以下是 tryLock
方法的执行流程:
- 获取当前线程
current
。 - 获取锁的状态
c
。这个状态表示锁被获取的次数,如果是 0,则表示锁当前没有被任何线程持有。 - 如果
c
为 0,则表示锁没有被其他线程持有。这时,使用compareAndSetState(0, 1)
方法尝试将状态从 0 设置为 1。这是一个原子操作,如果成功,则表示锁已经被当前线程获取。 - 如果
compareAndSetState
返回true
,则将当前线程设置为独占锁的拥有者setExclusiveOwnerThread(current)
,并返回true
,表示获取锁成功。 - 如果
c
不为 0,则表示锁已经被某个线程持有。这时,检查持有锁的线程是否为当前线程getExclusiveOwnerThread() == current
。 - 如果锁的持有者是当前线程,则表示这是一个重入锁,可以增加锁的计数。将
c
加 1 并更新状态setState(c)
,然后返回true
,表示获取锁成功。 - 如果锁的持有者不是当前线程,或者状态
c
不是 0 也不是由当前线程持有,则直接返回false
,表示获取锁失败。 - 如果在增加锁的计数时发生溢出(
c < 0
),则抛出Error
,这通常表示锁被重入的次数超过了Integer.MAX_VALUE
,这是一个不应该发生的异常情况。
tryLock(timeout,unit)
执行流程
ReentrantLock
的 tryLock(timeout, unit)
方法用于尝试在指定的时间内获取锁。
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryLockNanos(unit.toNanos(timeout));
}
如果成功获取到锁,则返回 true
;如果在指定的时间内没有获取到锁,则返回 false
。这个方法在尝试获取锁的过程中,如果线程被中断,会抛出 InterruptedException
。
执行流程如下:
检查线程中断状态:首先检查当前线程是否已经被中断,如果是,则抛出 InterruptedException
。
final boolean tryLockNanos(long nanos) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return initialTryLock() || tryAcquireNanos(1, nanos);
}
首次尝试获取锁:调用 initialTryLock()
方法进行首次尝试获取锁。
查看代码
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // first attempt is unguarded
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
- 如果锁的状态为
0
(表示锁当前可用),则使用compareAndSetState(0, 1)
尝试将锁状态设置为1
。如果设置成功,将当前线程设置为锁的持有者,并返回true
。 - 如果当前线程已经持有锁(即
getExclusiveOwnerThread() == current
),则增加锁的重入次数,并返回true
。 - 如果以上两种情况都不满足,说明锁已被其他线程持有,返回
false
。
限时尝试获取锁:如果首次尝试获取锁失败,调用 tryAcquireNanos(int arg, long nanosTimeout)
方法进行限时尝试获取锁。
查看代码
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (!Thread.interrupted()) {
if (tryAcquire(arg))
return true;
if (nanosTimeout <= 0L)
return false;
int stat = acquire(null, arg, false, true, true,
System.nanoTime() + nanosTimeout);
if (stat > 0)
return true;
if (stat == 0)
return false;
}
throw new InterruptedException();
}
- 首先再次检查线程中断状态,如果线程已被中断,则抛出
InterruptedException
。 - 尝试再次获取锁。如果成功,则返回
true
。 - 如果指定的时间
nanosTimeout
小于等于0
,则直接返回false
。 - 使用
acquire(null, arg, false, true, true, System.nanoTime() + nanosTimeout)
方法在指定的时间内尝试获取锁。这个方法会阻塞当前线程,直到获取到锁或者超时。如果获取到锁,返回true
;如果超时,返回false
。