1 引言

关于 ReentrantLock,有太多可以讨论的东西,捋一捋,循循渐进的话,我们先看以下几点:

  • ReentrantLock 底层基于 AQS 实现
  • AQS 底层又是基于 CAS 实现的
  • 可重入锁的实现原理

2 构造函数

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

默认的构造函数这里,创建了一个Sync,FairSync 和 NonfairSync 两者看起来是一个非常关键的组件,其实 NonfairSync 和 FairSync 都是 Sync 的子类,覆盖重写了几个方法,没什么特别的东西在里面,大概代表了一个Sync的具体实现。

我们继续往下看。

3 AQS基于无锁化的CAS机制实现高性能的加锁

3.1 lock 方法

再看一下核心方法: lock 方法

public void lock() {
	 sync.lock();
}

ReentrantLock 在进行加锁的时候,他其实是直接基于底层的 Sync 来实现的 lock 操作,这样看来,我们还需要再看看 Sync 的底层源码实现。

abstract static class Sync extends AbstractQueuedSynchronizer {
  // ……
} 

Sync 是一个抽象的静态内部类,也是 AQS 的子类,AQS 是 Java 并发包各种并发工具(锁、同步器)的底层的基础性的组件

AQS 里关键的一些东西,一个是Node(自定义数据结构,可以组成一个双向链表,也就是所谓的一个队列),另一个是 state(核心变量,加锁、释放锁都是基于state来完成的)。

AQS 底层加锁、释放锁,都是大量的基于CAS的操作来实现的,底层是基于 NonfairSync 的 lock 操作来实现加锁的。

final void lock() {
  	// 见 3.2 小节
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

3.2 compareAndSetState()

/**
* 返回值为 true 代表加锁成功
*/
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS里有一个核心的变量,state,代表了锁的状态;看一下state是否是0,如果是0的话,代表没人加过锁,此时当前线程就可以加锁,把这个state设置为1。 CAS 可以无锁化地保证一个数值修改的原子性。

compareAndSetState(0, 1)相当于是在尝试加锁,底层原来是基于Unsafe来实现的,JDK内部使用的API,基于cpu指令实现原子性的CAS(Atomic原子类底层也是基于 Unsafe 来实现的CAS操作)。

上面这行代码可以保证:在一个原子操作中,如果发现值是我们期望的这个expect值,说明符合要求,没人修改过,此时可以将这个值设置为update,state如果是0的话,就修改为1,代表加锁成功了,这个操作是CAS原子性的。

如果加锁成功了,compareAndSetState(0, 1)返回的是true,此时就说明加锁成功,它需要设置一下自己是当前加锁的线程。

关于 stateOffset,可以看看初始化的代码块:

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

3.3 setExclusiveOwnerThread()

setExclusiveOwnerThread(Thread.currentThread()):设置当前线程自己是加了一个独占锁的线程,标识出来自己是加锁的线程。

/**
* 这个方法是 AQS 的父类 AbstractOwnableSynchronizer 的方法
*/
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

4 可重入性实现原理

假设线程 1 已经获得锁了,此时线程1再次进入,会是怎样的流程。

如果是一个线程可重入的加锁会是什么样子呢?是如何来实现的呢?

compareAndSetState(0, 1):这个方法一定是false,会失败,此时state = 1,不是0,CAS操作会失败,返回false,此时会执行acquire(1) 方法,这个方法时 AQS 的方法。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire(1):此时首先会走这个方法,传递进去一个值是1,AQS的父类实现是一个空,其实是留给子类来实现的

protected final boolean tryAcquire(int acquires) {
      return nonfairTryAcquire(acquires);
}

nonfairTryAcquire(1):这个方法会走到Sync(父类)

final boolean nonfairTryAcquire(int acquires) {
   		// 先获取到当前的线程 -> 线程1
     final Thread current = Thread.currentThread();
   		// 获取state变量值的过程,JDK源码里大量的运用了volatile,可见性的问题,保证一些关键变量,修改 -> 读取的可见性
     int c = getState();
   		// 为什么会有这段代码呢?其实进入到这里,代表他之前一定是看到state != 0,才会进入到这里
   		// 就是人家代码的健壮性,怕的是之前state != 0,所以加锁失败了,但是进入到这里,人家再次判断一下,如果state是0,那么再次尝试加锁,就怕中间有人释放了锁
     if (c == 0) {
       if (compareAndSetState(0, acquires)) {
         setExclusiveOwnerThread(current);
         return true;
       }
     }
   		// 也就是说没有人释放锁,state != 0
   		// 再次判断,如果执行这个方法的线程 = exclusiveOwnerThread(加锁的线程)
   		// 代表的就是一个线程在可重入的加锁
   		// 之前他自己加过锁,然后在这里他就再次加锁
     else if (current == getExclusiveOwnerThread()) {
   		// 此时,c = 1
   		// nextc = c(1) + acquires(1) = 2
   		// 其实就是代表了一个线程可重入加锁了1次,2代表了加锁的次数
       int nextc = c + acquires;
       if (nextc < 0) // overflow
         throw new Error("Maximum lock count exceeded");
   				// 修改这个state的值,volatile保证了可见性
       setState(nextc);
       return true;
     }
     return false;
}

假设线程2过来尝试加锁,此时的源码会如何走向呢?

final boolean nonfairTryAcquire(int acquires) {
   		// 先获取到当前的线程 -> 线程1
     final Thread current = Thread.currentThread();
   		// 获取state变量值的过程,JDK源码里大量的运用了volatile,可见性的问题,保证一些关键变量,修改 -> 读取的可见性
     int c = getState();
   		// 为什么会有这段代码呢?其实进入到这里,代表他之前一定是看到state != 0,才会进入到这里
   		// 就是人家代码的健壮性,怕的是之前state != 0,所以加锁失败了,但是进入到这里,人家再次判断一下,如果state是0,那么再次尝试加锁,就怕中间有人释放了锁
     if (c == 0) {
       if (compareAndSetState(0, acquires)) {
         setExclusiveOwnerThread(current);
         return true;
       }
     }
   		// 也就是说没有人释放锁,state != 0
   		// 再次判断,如果执行这个方法的线程 = exclusiveOwnerThread(加锁的线程)
   		// 代表的就是一个线程在可重入的加锁
   		// 之前他自己加过锁,然后在这里他就再次加锁
     else if (current == getExclusiveOwnerThread()) {
   		// 此时,c = 1
   		// nextc = c(1) + acquires(1) = 2
   		// 其实就是代表了一个线程可重入加锁了1次,2代表了加锁的次数
       int nextc = c + acquires;
       if (nextc < 0) // overflow
         throw new Error("Maximum lock count exceeded");
   			// 修改这个state的值,volatile保证了可见性
       setState(nextc);
       return true;
     }
   		// 如果已经有一个线程加了锁,其他线程此时会走到这里
   		// 此时方法认为加锁失败,返回false
     return false;
   }
public final void acquire(int arg) {
// 此时加锁失败,第一个条件是false
// 开始走第二个条件,调用acquireQueued()方法
// 将当前线程入队阻塞等待
    if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
  }

addWaiter(Node.EXCLUSIVE):EXCLUSIVE(排他性,独占锁,同一时间只能有一个线程获取到锁,此时是排他锁,独占锁)

将当前线程(线程2)封装成了一个Node,mode = EXCLUSIVE(排他锁,尝试获取一个排他锁,但是失败了),

Node 是 AQS 里面的一个静态内部类,核心属性:

/**
* 如果一个线程无法获取到锁的话,会进入一个阻塞等待的状态
* 卡住不动,线程挂起,阻塞状态又细分为很多种不同的阻塞状态:
* CANCELED、SIGNAL、CONDITION、PROPAGATE
*/
volatile int waitStatus;

/**
* 一个节点可以有上一个节点,prev指针,指向了Node的上一个Node
*/
volatile Node prev;

/**
* 一个节点还可以有下一个节点,next指针,指向了Node的下一个Node
*/
volatile Node next;

/**
* Node里面封装了一个线程
*/
volatile Thread thread;

/**
* 可以认为是下一个等待线程
*/
Node nextWaiter;

对于获取不到锁,处于等待状态的线程,会被封装为一个Node,最后多个处于阻塞等待状态的线程可以封装为一个Node双向链表,这也是为什么被叫做抽象队列同步器的原因。

// 最后返回的事线程 2 对应的 node
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
      	// 尝试比较tail变量是否为t,如果为t的话,那么tail指针就指向node
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
// pred 为 null,则无限 for 循环
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
          	// 为 null 初始化 head,并将 tail 也指向 head
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
          	// 尝试比较tail变量是否为t,如果为t的话,那么tail指针就指向传参进来的 node
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
  • headOffset -> 在AQS类里,head变量所在的位置,CAS操作的,判断一下,head变量是否为null,如果是null的话,就将head设置为空Node节点
  • compareAndSetTail(t, node):尝试比较tail变量是否为t,如果为t的话,那么tail指针就指向node
final boolean acquireQueued(final Node node, int arg) {
       boolean failed = true;
       try {
           boolean interrupted = false;
           for (;;) {
   							// 获取到node的上一个节点
   							// prev指针指向的节点
               final Node p = node.predecessor();
   							// 这个地方,其实会再次调用tryAcquire方法尝试加锁
   							// 如果加锁成功,其实是会将线程2对应的Node从队列中移除
               if (p == head && tryAcquire(arg)) {
                   setHead(node);
                   p.next = null; // help GC
                   failed = false;
                   return interrupted;
               }
   							// 如果说再次尝试加锁失败了
   							// 那么此时会判断一下,是否需要将当前线程挂起,阻塞等待
   							// 如果是需要的话,此时就会使用park操作挂起当前线程
               if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())
                   interrupted = true;
           }
       } finally {
           if (failed)
               cancelAcquire(node);
       }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
       // pred -> 空Node
       // 默认情况下,watiStatus应该是0,或者是空
       int ws = pred.waitStatus;
       if (ws == Node.SIGNAL)
           /*
            * This node has already set status asking a release
            * to signal it, so it can safely park.
            */
           return true;
       if (ws > 0) {
           /*
            * Predecessor was cancelled. Skip over predecessors and
            * indicate retry.
            */
           do {
               node.prev = pred = pred.prev;
           } while (pred.waitStatus > 0);
           pred.next = node;
       } else {
           /*
            * waitStatus must be 0 or PROPAGATE.  Indicate that we
            * need a signal, but don't park yet.  Caller will need to
            * retry to make sure it cannot acquire before parking.
            */
           // 将空Node的waitStatus设置为SIGNAL
           compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
       }
       return false;
}
private final boolean parkAndCheckInterrupt() {
        // LockSupport的park操作,就是将一个线程进行挂起,不让你动了
        // 必须得有另外一个线程来对当前线程执行unpark操作,唤醒挂起的线程
        LockSupport.park(this);
        return Thread.interrupted();
}
public final void acquire(int arg) {
        // 先尝试加锁
        // 如果加锁失败,addWaiter()方法将自己挂到队列中去
        // 接着acquireQueued()方法负责park操作挂起当前线程,阻塞等待
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

5 非公平锁与公平锁

ReentrantLock 默认是非公平锁。不遵循先来先加锁的原则,可以竞争。

公平锁则保证先来先加锁,按照顺序排队来加锁。

if (c == 0) {
  if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
  }
} 

公平锁的核心,就是一行代码,每次加锁的时候,都要先判断一下,如果前面没有排队等待的线程的话,那就尝试加锁,否则是不能尝试加锁的。

public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
  			// h != t,如果h != t,说明head和tail不一样,如果一样代表了队列里有人在排队
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
}
  1. h != t,如果h != t,说明head和tail不一样,如果一样还要接着判断;

  2. 但是如果说head的下一个节点是null,说明没人在排队,因为有一个是null,所以此时也是返回true;

  3. 或者是s,也就是排在队头的节点,队头节点的线程如果不是当前线程,所以此时也是返回true。

公平锁,任何一个线程过来会先判断一下,当前是否有人在排队,而且是不是自己在排队,如果不是的话,说明有别人在排队,此时自己不能尝试加锁,直接入队阻塞等待。

6 tryLock()

这一小节我们来看看 tryLock 是如何实现加锁等待一段时间过后并放弃的。

方法调用流程:

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
  

核心的流程是这样的:先尝试加锁,加锁失败放到队列里,并设置一个过期时间,如果过了过期时间,会尝试再次加锁,如果加锁失败,就返回 false。

7 可重入锁释放锁的流程

方法调用链路:

public void unlock() {
    sync.release(1);
}

走到 AQS 里面

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// ReentrantLock 的方法,核心思想是判断 state - 1 是否等于 0,等于 0 则返回 true,否则返回 false
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
  	// 当前线程不等于加锁的线程,说明不是你加的锁,结果你来释放锁
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

此时我们可以看到,会唤醒处于队头的元素。也就是说如果一个线程来释放锁的话,他除了更新state和锁占有线程以外,他其实主要干的一个事儿就是用 LockSupport 的 unpark 操作唤醒了一个处于队头的一个线程。

队头线程此时被unpark唤醒之后会干什么?我们可以看 parkAndCheckInterrupt() 方法:

private final boolean parkAndCheckInterrupt() {
				// 某一个线程其实是在这里会被挂起
        LockSupport.park(this);
        return Thread.interrupted();
}

如果一旦被unpark唤醒之后,就会在这里苏醒过来,重新进入一个for循环里面

for (;;) {
     final Node p = node.predecessor();
     if (p == head && tryAcquire(arg)) {
         setHead(node);
         p.next = null; // help GC
         failed = false;
         return interrupted;
     }
     if (shouldParkAfterFailedAcquire(p, node) &&
         parkAndCheckInterrupt())
         interrupted = true;
}

此时线程2会再次尝试去获取锁,因为他是队头线程,他的上一个节点一定就是那个head指针指向的节点了