互斥锁ReentrantLock一探究竟

讲互斥锁之前,我们先学习一下基础的框架。

Lock接口

我们知道,从jdk 1.5之后在juc包中新增了Lock接口,通过Lock接口实现锁的功能,可以提供和synchronized关键字差不多的多线程同步功能。
Lock操作使用还是非常简单,进行lock操作后,只需要记得在finally中进行unlock操作即可,虽然不像synchronized那样可以更隐秘的进行同步操作,但是也很便捷。

阅读源码可以发现, Lock接口包括6个基本操作,如下面所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//获取锁操作,调用这个方法当前线程将会获取锁。
void lock();

//可中断的获取锁,和lock方法的不同是这个方法可以响应中断,也就是说当前线程在获取锁时可能中断。
void lockInterruptibly() throws InterruptedException;

//尝试非阻塞地获取锁,如果成功获取就返回true,否则返回false
boolean tryLock();

//超时获取锁。三种情况下会返回。1: 当前线程获取了锁,返回true,2: 当前线程在time内被中断,3: time时间到,返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

//释放锁
void unlock();

//获取等待通知的instance,并且和当前线程绑定,当前线程只有获得了锁,才能调用wait方法,调用后当前线程就会释放锁。
Condition newCondition();

Lock接口主要就讲到这里,接下来我们就看看互斥锁的核心部分AbstractQueuedSynchronizer。其实Lock的实现都是通过AbstractQueuedSynchronizer的子类来完成的多线程加锁释放锁的。

AbstractQueuedSynchronizer队列同步器

AbstractQueuedSynchronizer是一个抽象类(以下我们简称AQS),主要是实现锁和其他同步功能组件的一个基本模板框架,维护了一个volatile类型的int变量state表示同步的状态,通过一个FIFO的队列来进行线程的排队管理。

AQS的基本框架和方法

当我们要实现一个同步的锁时,我们只需要集成AbstractQueuedSynchronizer并且实现它的抽象方法来对state进行操作,它提供了三个方法来操作state。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 同步状态
*/

private volatile int state;

//获取当前同步状态
protected final int getState() {
return state;
}

//设置当前同步状态
protected final void setState(int newState) {
state = newState;
}

//使用CAS设置当前状态,保证原子性
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

其实在使用CAS进行原子操作时,用到的Unsafe类,本质上就和我们之前介绍原子类AtomicInteger等一系列中讲到的就一样啦,这里就不再赘述了,主要关心AQS的框架和模板方法。说到模板方法,AQS本身是一个抽象类,也提供了一些方法供继承实现的类来实现这些方法,这些方法包括可重写的与不可重写的(final),这里我们先看个大概,接下来研究ReentrantLock时会详细看到。

1
2
3
4
5
6
7
8
9
10
11
//可重写的:
//独占的获取同步状态
protected boolean tryAcquire(int arg);
//独占的释放同步状态
protected boolean tryRelease(int arg)
//共享的获取同步状态
protected int tryAcquireShared(int arg);

//共享的释放同步状态
protected boolean tryReleaseShared(int arg);
//判断当前同步器是否被当前线程所独占
protected boolean isHeldExclusively();

我们可以看到,其实AQS提供了独占式和共享式两种对同步状态的操作,独占锁是一个锁在同一个时间点只能有一个线程占有和操作同步状态,并且又分为公平锁和非公平锁,比如我们接下来会讲到的ReentrantLock。共享锁是指能被多个线程同时拥有,能被共享的锁,例如ReentrantReadWriteLock。并且自定义的同步组件可以使用重写这些基本方法以及final的模板方法来实现自己的同步语义。

CLH队列

我们之前也介绍过了,AQS中维护一个FIFO的双向队列,实际上这个队列就是CLH队列。在独占锁的情况下,竞争资源和锁在一个时间点只能被一个线程访问并获取占有,其它的线程则需要等待。CLH就是管理这些“等待锁的线程”的队列。所以说,CLH队列主要用来管理AQS的同步状态。当前的线程获取同步状态失败时,AQS会将当前线程以及等待信息构造为一个Node,并且将这个Node加入CLH队列,并且将这个线程阻塞掉,等待下次同步状态被释放时被首节点中的线程唤醒。

Node的属性类型和名称,我们先列出一些基本的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
*等待的状态。主要包括5个状态,1. CANCELLED =1, 同步队列汇总等待的线程等待超时或者被中断,需要从同步队列中取消等待,Node进入这个状态并不会发生什么变化。
2. SIGNAL = -1。 表明当前Node的下一个后继结点需要被唤醒,一般是当前结点的线程释放了同步状态或者被取消后的事情。
3. CONDITION = -2. 表明结点是等待在Condition上,只有其他线程对这个condition调用了signal后,这个结点才会从等待队列转移到同步队列。
4. PROPAGATE = -3. 表镜下一个共享的获取同步状态的获取会无条件的传播下去。
5. 0 初始化状态,不符合以上任何条件的。
*/

volatile int waitStatus;

/*
* 后继结点,也就是当前结点会唤醒的下一个结点
*/

volatile Node next;

/*
* 前驱结点, 在结点加入到同步队列后进行设置的。
*/

volatile Node prev;

/*
* 等待队列中的后继结点。如果当前结点是共享的,那么这个字段就是一个SHARED常量,也就是说结点类型和等待队列中的后继结点公用一个字段。
*/

Node nextWaiter;

/*
* 获取同步状态的线程
*/

volatile Thread thread;

Node是构成同步队列的基础,在AQS中拥有一个head结点和tail结点,凡是没有成功获取到同步状态的线程都将会构造为Node加入到该队列的尾部。

需要指出的是,加入队列时,是通过CAS操作进行的。这一过程我们在后面会详细介绍,先还是关注一下CLH队列。

我们已经知道了,AQS包括两个引用,一个指向头结点,另一个指向尾结点,当有新的结点要加入后,通过CAS操作设置尾结点,并且将尾结点的prev指向之前的尾结点,这样就完成了结点的插入。

首节点是获取到同步状态的结点,当首结点的线程在释放同步状态后,就会唤醒后继结点,而此时后继结点如果也能成功的获取到同步状态,就会将自己设置为首结点。这时候AQS也会更新自己的head结点的引用。

以上大致就是CLH队列的工作原理和过程,知道了这些之后我们结合ReentrantLock来具体看看同步状态的获取和释放。

ReentrantLock细究

ReentrantLock实现了公平锁和非公平锁。我们前面已经介绍了,ReentrantLock的核心在于AQS,在ReentrantLock中有一个抽象静态内部类Sync继承了AbstractQueuedSynchronizer来进行相关的同步状态的管理。而公平锁和非公平锁则是分别通过 FairSync和NonfairSync来继承Sync来进行锁的实现的,因此我们只需要搞懂FairSync和NonfairSync即可。

公平锁FairSync

我们知道,在使用ReentrantLock时获取锁是通过lock()函数。下面,我们就通过lock()对获取公平锁的过程进行一步步的探究学习。

1
2
3
final void lock() {
acquire(1);
}

这段代码很简单,就是通过acquire(1)获取锁。这里为什么传的参数是1呢,只是设置“锁的状态”的参数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是0;锁被线程首次获取到了,它的状态值就变成了1。于ReentrantLock是可重入锁,所以独占锁可以被同一个线程多此获取,每获取1次就将锁的状态+1。也就是说,第一次获取锁时,通过acquire(1)将锁的状态值设为1;再次获取锁时,将锁的状态值设为2;依次类推。

acquire

acquire是在AQS中实现的。

1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

上面这段代码看似很简单,其实经历了好几个过程。我们先将一下逻辑。

  1. 当前线程首先通过tryAcquire()尝试独占的获取锁。获取成功的话,直接返回,尝试失败的话,进入到等待队列排序等待。
  2. 当前线程尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将此线程加入到CLH队列末尾。
  3. 执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。由于是公平锁,它会根据公平性原则来获取锁。
  4. 当前线程在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回,如果当前线程在休眠等待过程中被中断过,acquireQueued会返回true,此时if条件符合,会使当前线程调用selfInterrupt()来自己给自己产生一个中断。

好,我们先了解一下这个代码的基本逻辑,接下来我们对这四个方法一一研究。

tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected final boolean tryAcquire(int acquires) {
//获取当前要获取同步状态的线程
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //没有任何线程获取锁
//如果是头结点,那么通过CAS自旋去设置同步的状态,并将当前线程设置为锁的独占拥有者。
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //如果当前线程已经是锁的独占拥有者,那么就更新一下同步状态。(即为可重入)
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

我们看看hasQueuedPredecessors来做什么的:

1
2
3
4
5
6
7
public final boolean hasQueuedPredecessors() {

Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

我们可以看到,返回的结果也是多个条件的判断,我们看看返回true的情况:

  1. h!=h, 也就是说CLH队列不是空,有线程在等待获取锁。
  2. h.next == null ,只有一个线程在等待,s.thread != Thread.currentThread()则是有比当前线程更早的线程在等待。
    因此可以发现hasQueuedPredecessors()通过判断当前线程的结点是不是在CLH队列的队首,来返回AQS中是不是有比当前线程等待更久的线程。和我们的分析是一致的。
    接下来我们看看CAS操作设置状态。
1
2
3
4
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

compareAndSwapInt是unsafe类中的一个本地方法,这种类型的操作我们在AtomicInteger中已经介绍了很多次了,这里同样的,采用一个原子操作来修改state的状态值,保证了原子性。
我们接着看setExclusiveOwnerThread,这个方法是在AbstractOwnableSynchronizer中,AbstractOwnableSynchronizer是一个抽象类,AQS就是继承AbstractOwnableSynchronizer实现的。

1
2
3
4
5
6
//独占锁的当前拥有者
private transient Thread exclusiveOwnerThread;

protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}

所以setExclusiveOwnerThread能够设置当前线程为拥有锁的独占拥有者。
好了,tryAcquire内容大致就是这些了,我们可以看到就像名称所说,只是尝试获得锁,如果获取成功,就返回true,如果获取失败,可以通过其他方式再去获得锁,这些方式我们下面会讲。

addWaiter

还记得我们之前看的acquire吗,当tryAcquire获取成功返回true后,acquire就直接结束了,当tryAcquire返回false时,就会进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg),我们先看看addWaiter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter(Node mode) {
//新建一个Node,传入新的两个参数,一个是当前线程,一个是模式,这里是独占模式
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果CLH队列不为空,则通过CAS操作将当前线程插入到队列的尾部
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果队列为空,就先创建队列,再插入结点
enq(node);
return node;
}

这里的思路还是很清晰的,主要就是将当前线程构造为结点后插入到等待队列中。其中在队列为空时走到了enq,我们看看这里面发生了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

我们可以看到,在enq中是通过一个死循环来保证结点的正确添加,在死循环中只有通过CAS将结点成功设置为尾结点后当前线程才能从enq中返回。
事实上当结点添加到队列中后,就会不停的自旋,直到条件满足获取到了同步状态才会结束自旋过程。

所以总结一下addWaiter的作用就是将当前线程构造成结点添加到CLH队列的尾部,也就是说当当前线程在tryAcquire失败时,会添加到等待队列中进行等待。

acquireQueued

当一个线程被构造给Node并且加到等待队列后,接下来就是acquireQueued来进行操作了。acquireQueued会逐步检查队列中的线程去获取同步状态,废话不说了,我们先看看代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // 有助于GC回收
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

看了源码,对于传入的node,也就是当前的线程,事实上操作的是它的前驱结点
这是为什么?

  1. 首先, 头结点是成功获取到同步状态的结点,当头结点释放了同步状态后才会唤醒后继结点。并且后继结点的线程被唤醒后还需要检查自己的结点是否是头结点。
  2. 因为结点的线程在等待过程中可能会发生中断从而返回,或者前驱结点出队,只有前置结点是头结点时才可以tryAcquire,这样能够保证CLH队列的FIFO的特性。

我们看看具体的流程,只有当前置结点是头结点并且尝试获取锁成功后,会将当前的线程的结点设置为头结点,并且直接返回false,也就是没有中断过。

那么中断过的是什么过程呢?我们先看看上面有两个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//当前结点获取锁失败时检查并更新状态,返回当前线程是否要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前继节点的状态
int ws = pred.waitStatus;
// 如果前继节点是SIGNAL状态,说明这当前线程需要被unpark唤醒。所以可以安全的阻塞。
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//如果前继节点是取消状态,则设置当前节点的当前前继节点为原前继节点的前继节点。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前继节点为“0”或者“PROPAGATE”,则设置前继节点为SIGNAL状态,但是不需要阻塞。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

如果这一步返回的是true, 即当前结点的前置结点的状态为SIGNAL,那么当前线程需要被阻塞掉。阻塞的过程通过parkAndCheckInterrupt来实现。

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

parkAndCheckInterrupt就是通过LockSupport来阻塞线程,并且返回线程是否中断过,同时还会清除线程中断的标记。这也是为什么在acquireQueued中会对interrupted进行操作的原因,因为需要保存线程的中断标记,关于这个我们稍后再说。
结合我们之前所讲的,我们可以先思考一下当线程被阻塞后什么时候会被唤醒?

  1. 前置结点在释放锁时会通过unpark唤醒下一个结点也就是当前线程
  2. 其他线程通过interrupt中断当前线程时会唤醒当前线程

所以我们回到acquireQueued中,看for循环中,会先判断前继结点是否是队列头结点,如果是就会尝试获得锁。
因为我们也在shouldParkAfterFailedAcquire中知道了当前线程是否会被阻塞,如果会阻塞就将其阻塞掉,并且通过变量保存一下这个线程是否被中断过。
正是有这种情况存在,所以在线程被唤醒时,必须先检查唤醒自己的线程是不是队列的头结点,这样才符合FIFO的公平性,这也是公平锁的意义。

总之,acquireQueued实现了当前线程会根据公平性原则进行阻塞等待,直到获取锁为止,并且会返回当前线程在等待过程中有没有并中断过的标记。

selfInterrupt

回到AQS中acquire的if判断,当线程获取锁失败,并且被加到队列尾部,也返回了这个线程是否要阻塞以及阻塞后返回了是否中断过的标记后,这时候再把自己中断一下。

1
2
3
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

根据我们之前的分析,在acquireQueued中,只有当前线程被中断过,才会走到这一步。
那么思考一下,即使是线程在阻塞状态中被中断唤醒,但是如果前面还有其他等待的线程,那么还是无法获取到锁,会再次阻塞,直到前一个结点唤醒自己。
而这个线程在真正执行之前由于走到了interrupted,所以会清除这个标记,因此需要重新中断一下当前的线程。

好了,关于公平锁的基本框架和原理就是这些了,其实回过头看看acquire,主要就是做了tryAcquire,addWaiter,acquireQueued和selfInterrupt。其中tryAcquire是一定会做的事情。

而由于ReentrantLock是可重入锁,在获取同步状态时,传入的arg就是当前线程获取锁的次数,当重入了多少次,就得unlock多少次。说到了unlock,那我们就来看一看unlock.

unlock

1
2
3
public void unlock() {
sync.release(1);
}

可以看到,unlock非常简单,就是执行了AQS的release. 我们看到参数是1,也就是每次释放时只对同步状态进行减1操作。unlock调用了AQS的release,让我们看一眼。

release

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

这种形式和思路是很像acquire的。首先会执行sync的tryRelease,如果成功就返回true,并且如果头结点不为空并且状态不为0,就唤醒下一个结点.

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

我们看看源码,首先会计算当前线程的结点在释放后的状态。然后检查当前线程,如果不是锁的独占拥有者,就会抛出异常。

当状态为减到0后,就采用setExclusiveOwnerThread,将锁的拥有者置为null,并且通过setState设置同步状态。这里的种种方法在上文已经介绍过,这里就不再赘述。
只有c==0时,意味着这个重入锁的线程已经完全释放了当前的锁,所以就返回true,其他情况返回false.

我们回到release, 当完全释放锁成功后,就会进入unparkSuccessor.

unparkSuccessor

顾名思义,应该是唤醒当前结点的后继结点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

首先,我们需要明确的是,正在进行操作的当前线程,一定是头结点。

  1. 先获取node的等待状态,如果是小于0的,首先会把waitStatus变成0。
  2. 获取当前结点的后继结点,如果结点的waitStatus>0 ,则通过for循环往下继续进行获取直到找到一个有效的(waitStatus <=0)。
  3. 如果找到了第一个有效的后继结点,就通过LockSupport的unpark唤醒它。

好了,看到这里其实对公平锁的获取与释放就很清楚了。接下来我们看看非公平锁。

NonfairSync 非公平锁

一样的学习思路,我们先看看锁的获取过程。

lock

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

从上面这个代码可以看到,lock会先通过compareAndSet(0, 1)来判断锁是不是空闲状态,如果是的,那么当前线程可以直接获取锁,否则调用acquire(1)获取锁。

  1. compareAndSetState()是CAS函数,用来比较并设置当前锁的状态。若锁的状态值为0,就设置锁的状态值为1。
  2. setExclusiveOwnerThread(Thread.currentThread())的作用是,设置当前线程为这个锁的拥有者。

acquire

直接看代码

1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

这里的这段代码还是来自AQS,可以看到是和公平锁是公用的一份。但是不同点在于tryAcquire是不同的。我们看看NonfairSync中尝试获取锁的实现。

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//AQS中
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

在tryAcquire中是调用了nonfairTryAcquire,执行非公平策略的尝试获取。 同样的,先获取同步状态,如果为0,说明还没有线程占有当前的锁,因此就通过CAS来设置重入的参数,并将当前线程设置为锁的
拥有者。之后就直接从这个方法中返回。

另外,如果锁的拥有者就是当前的线程,就可以实现重入的效果,即可以继续设置新的同步状态然后返回。

以上就是非公平锁的获取锁的过程,在研究了公平锁的实现后,再到这里理解这个就相对比较轻松。

至于非公平锁的释放,就是和公平锁一模一样了。

总结

其实可重入锁ReentrantLock只是对锁套了一层皮,对获取锁的过程中多了一个对当前线程和锁的拥有者的比较判断,然后以此为基础实现了公平锁和非公平锁。
核心就是我们在上文中主要研究的AQS,AbstractQueuedSynchronizer。
而要弄清楚AbstractQueuedSynchronizer,主要需要理解其中的CLH队列,即一个FIFO双向队列,实现是将每一个需要等待的线程构造成为一个结点Node,然后插入到队列中。
队列的头结点就是获取到同步状态的线程。

每次操作就是对队列的结点进行相应的操作。具体过程就参考上述的步骤吧。

关于锁和AQS也介绍的差不多了,先写到这里吧。 = =