JDK 源码阅读010:ReentrantLock
1 引言
关于 ReentrantLock,有太多可以讨论的东西,捋一捋,循循渐进的话,我们先看以下几点:
- ReentrantLock 底层基于 AQS 实现
- AQS 底层又是基于 CAS 实现的
- 可重入锁的实现原理
2 构造函数
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
默认的构造函数这里,创建了一个Sync,FairSync 和 NonfairSync 两者看起来是一个非常关键的组件,其实 NonfairSync 和 FairSync 都是 Sync 的子类,覆盖重写了几个方法,没什么特别的东西在里面,大概代表了一个Sync的具体实现。
我们继续往下看。
3 AQS基于无锁化的CAS机制实现高性能的加锁
3.1 lock 方法
再看一下核心方法: lock 方法
public void lock() {
sync.lock();
}
ReentrantLock 在进行加锁的时候,他其实是直接基于底层的 Sync 来实现的 lock 操作,这样看来,我们还需要再看看 Sync 的底层源码实现。
abstract static class Sync extends AbstractQueuedSynchronizer {
// ……
}
Sync 是一个抽象的静态内部类,也是 AQS 的子类,AQS 是 Java 并发包各种并发工具(锁、同步器)的底层的基础性的组件
AQS 里关键的一些东西,一个是Node(自定义数据结构,可以组成一个双向链表,也就是所谓的一个队列),另一个是 state(核心变量,加锁、释放锁都是基于state来完成的)。
AQS 底层加锁、释放锁,都是大量的基于CAS的操作来实现的,底层是基于 NonfairSync 的 lock 操作来实现加锁的。
final void lock() {
// 见 3.2 小节
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
3.2 compareAndSetState()
/**
* 返回值为 true 代表加锁成功
*/
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS里有一个核心的变量,state,代表了锁的状态;看一下state是否是0,如果是0的话,代表没人加过锁,此时当前线程就可以加锁,把这个state设置为1。 CAS 可以无锁化地保证一个数值修改的原子性。
compareAndSetState(0, 1)
相当于是在尝试加锁,底层原来是基于Unsafe来实现的,JDK内部使用的API,基于cpu指令实现原子性的CAS(Atomic原子类底层也是基于 Unsafe 来实现的CAS操作)。
上面这行代码可以保证:在一个原子操作中,如果发现值是我们期望的这个expect值,说明符合要求,没人修改过,此时可以将这个值设置为update,state如果是0的话,就修改为1,代表加锁成功了,这个操作是CAS原子性的。
如果加锁成功了,compareAndSetState(0, 1)返回的是true,此时就说明加锁成功,它需要设置一下自己是当前加锁的线程。
关于 stateOffset,可以看看初始化的代码块:
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
3.3 setExclusiveOwnerThread()
setExclusiveOwnerThread(Thread.currentThread())
:设置当前线程自己是加了一个独占锁的线程,标识出来自己是加锁的线程。
/**
* 这个方法是 AQS 的父类 AbstractOwnableSynchronizer 的方法
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
4 可重入性实现原理
假设线程 1 已经获得锁了,此时线程1再次进入,会是怎样的流程。
如果是一个线程可重入的加锁会是什么样子呢?是如何来实现的呢?
compareAndSetState(0, 1)
:这个方法一定是false,会失败,此时state = 1,不是0,CAS操作会失败,返回false,此时会执行acquire(1) 方法,这个方法时 AQS 的方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(1)
:此时首先会走这个方法,传递进去一个值是1,AQS的父类实现是一个空,其实是留给子类来实现的
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
nonfairTryAcquire(1)
:这个方法会走到Sync(父类)
final boolean nonfairTryAcquire(int acquires) {
// 先获取到当前的线程 -> 线程1
final Thread current = Thread.currentThread();
// 获取state变量值的过程,JDK源码里大量的运用了volatile,可见性的问题,保证一些关键变量,修改 -> 读取的可见性
int c = getState();
// 为什么会有这段代码呢?其实进入到这里,代表他之前一定是看到state != 0,才会进入到这里
// 就是人家代码的健壮性,怕的是之前state != 0,所以加锁失败了,但是进入到这里,人家再次判断一下,如果state是0,那么再次尝试加锁,就怕中间有人释放了锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 也就是说没有人释放锁,state != 0
// 再次判断,如果执行这个方法的线程 = exclusiveOwnerThread(加锁的线程)
// 代表的就是一个线程在可重入的加锁
// 之前他自己加过锁,然后在这里他就再次加锁
else if (current == getExclusiveOwnerThread()) {
// 此时,c = 1
// nextc = c(1) + acquires(1) = 2
// 其实就是代表了一个线程可重入加锁了1次,2代表了加锁的次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 修改这个state的值,volatile保证了可见性
setState(nextc);
return true;
}
return false;
}
假设线程2过来尝试加锁,此时的源码会如何走向呢?
final boolean nonfairTryAcquire(int acquires) {
// 先获取到当前的线程 -> 线程1
final Thread current = Thread.currentThread();
// 获取state变量值的过程,JDK源码里大量的运用了volatile,可见性的问题,保证一些关键变量,修改 -> 读取的可见性
int c = getState();
// 为什么会有这段代码呢?其实进入到这里,代表他之前一定是看到state != 0,才会进入到这里
// 就是人家代码的健壮性,怕的是之前state != 0,所以加锁失败了,但是进入到这里,人家再次判断一下,如果state是0,那么再次尝试加锁,就怕中间有人释放了锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 也就是说没有人释放锁,state != 0
// 再次判断,如果执行这个方法的线程 = exclusiveOwnerThread(加锁的线程)
// 代表的就是一个线程在可重入的加锁
// 之前他自己加过锁,然后在这里他就再次加锁
else if (current == getExclusiveOwnerThread()) {
// 此时,c = 1
// nextc = c(1) + acquires(1) = 2
// 其实就是代表了一个线程可重入加锁了1次,2代表了加锁的次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 修改这个state的值,volatile保证了可见性
setState(nextc);
return true;
}
// 如果已经有一个线程加了锁,其他线程此时会走到这里
// 此时方法认为加锁失败,返回false
return false;
}
public final void acquire(int arg) {
// 此时加锁失败,第一个条件是false
// 开始走第二个条件,调用acquireQueued()方法
// 将当前线程入队阻塞等待
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter(Node.EXCLUSIVE)
:EXCLUSIVE(排他性,独占锁,同一时间只能有一个线程获取到锁,此时是排他锁,独占锁)
将当前线程(线程2)封装成了一个Node,mode = EXCLUSIVE(排他锁,尝试获取一个排他锁,但是失败了),
Node 是 AQS 里面的一个静态内部类,核心属性:
/**
* 如果一个线程无法获取到锁的话,会进入一个阻塞等待的状态
* 卡住不动,线程挂起,阻塞状态又细分为很多种不同的阻塞状态:
* CANCELED、SIGNAL、CONDITION、PROPAGATE
*/
volatile int waitStatus;
/**
* 一个节点可以有上一个节点,prev指针,指向了Node的上一个Node
*/
volatile Node prev;
/**
* 一个节点还可以有下一个节点,next指针,指向了Node的下一个Node
*/
volatile Node next;
/**
* Node里面封装了一个线程
*/
volatile Thread thread;
/**
* 可以认为是下一个等待线程
*/
Node nextWaiter;
对于获取不到锁,处于等待状态的线程,会被封装为一个Node,最后多个处于阻塞等待状态的线程可以封装为一个Node双向链表,这也是为什么被叫做抽象队列同步器的原因。
// 最后返回的事线程 2 对应的 node
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 尝试比较tail变量是否为t,如果为t的话,那么tail指针就指向node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// pred 为 null,则无限 for 循环
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 为 null 初始化 head,并将 tail 也指向 head
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 尝试比较tail变量是否为t,如果为t的话,那么tail指针就指向传参进来的 node
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
- headOffset -> 在AQS类里,head变量所在的位置,CAS操作的,判断一下,head变量是否为null,如果是null的话,就将head设置为空Node节点
- compareAndSetTail(t, node):尝试比较tail变量是否为t,如果为t的话,那么tail指针就指向node
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取到node的上一个节点
// prev指针指向的节点
final Node p = node.predecessor();
// 这个地方,其实会再次调用tryAcquire方法尝试加锁
// 如果加锁成功,其实是会将线程2对应的Node从队列中移除
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果说再次尝试加锁失败了
// 那么此时会判断一下,是否需要将当前线程挂起,阻塞等待
// 如果是需要的话,此时就会使用park操作挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// pred -> 空Node
// 默认情况下,watiStatus应该是0,或者是空
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 将空Node的waitStatus设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// LockSupport的park操作,就是将一个线程进行挂起,不让你动了
// 必须得有另外一个线程来对当前线程执行unpark操作,唤醒挂起的线程
LockSupport.park(this);
return Thread.interrupted();
}
public final void acquire(int arg) {
// 先尝试加锁
// 如果加锁失败,addWaiter()方法将自己挂到队列中去
// 接着acquireQueued()方法负责park操作挂起当前线程,阻塞等待
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
5 非公平锁与公平锁
ReentrantLock 默认是非公平锁。不遵循先来先加锁的原则,可以竞争。
公平锁则保证先来先加锁,按照顺序排队来加锁。
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
公平锁的核心,就是一行代码,每次加锁的时候,都要先判断一下,如果前面没有排队等待的线程的话,那就尝试加锁,否则是不能尝试加锁的。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h != t,如果h != t,说明head和tail不一样,如果一样代表了队列里有人在排队
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
-
h != t,如果h != t,说明head和tail不一样,如果一样还要接着判断;
-
但是如果说head的下一个节点是null,说明没人在排队,因为有一个是null,所以此时也是返回true;
-
或者是s,也就是排在队头的节点,队头节点的线程如果不是当前线程,所以此时也是返回true。
公平锁,任何一个线程过来会先判断一下,当前是否有人在排队,而且是不是自己在排队,如果不是的话,说明有别人在排队,此时自己不能尝试加锁,直接入队阻塞等待。
6 tryLock()
这一小节我们来看看 tryLock 是如何实现加锁等待一段时间过后并放弃的。
方法调用流程:
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
核心的流程是这样的:先尝试加锁,加锁失败放到队列里,并设置一个过期时间,如果过了过期时间,会尝试再次加锁,如果加锁失败,就返回 false。
7 可重入锁释放锁的流程
方法调用链路:
public void unlock() {
sync.release(1);
}
走到 AQS 里面
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// ReentrantLock 的方法,核心思想是判断 state - 1 是否等于 0,等于 0 则返回 true,否则返回 false
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 当前线程不等于加锁的线程,说明不是你加的锁,结果你来释放锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
此时我们可以看到,会唤醒处于队头的元素。也就是说如果一个线程来释放锁的话,他除了更新state和锁占有线程以外,他其实主要干的一个事儿就是用 LockSupport 的 unpark 操作唤醒了一个处于队头的一个线程。
队头线程此时被unpark唤醒之后会干什么?我们可以看 parkAndCheckInterrupt()
方法:
private final boolean parkAndCheckInterrupt() {
// 某一个线程其实是在这里会被挂起
LockSupport.park(this);
return Thread.interrupted();
}
如果一旦被unpark唤醒之后,就会在这里苏醒过来,重新进入一个for循环里面
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
此时线程2会再次尝试去获取锁,因为他是队头线程,他的上一个节点一定就是那个head指针指向的节点了