这篇文章是AQS系列的最后一篇文章,也是非常重要的一篇,因为这篇文章将引入并发编程中非常重要的一个概念:条件变量。在聊条件变量之前我想先聊聊管程(monitor),下面是对管程的描述:
在并发程序中,管程是一种同步结构,它不仅允许线程拥有互斥和等待条件变化的能力,其还可以告诉其他线程条件是否满足。管程是由一个互斥量和多个条件变量构成,一个条件变量实质上是一个等待条件的容器。在再次获得互斥量执行任务之前,管程给线程提供了暂时放弃互斥量有序等待条件满足的机制
AQS其实就是对管程这个模型的具体实现,我们回想AQS系列的前四篇文章,就是对互斥量的实现,而今天,介绍的就是条件变量。每个条件变量都会有两个方法,唤醒和等待。当条件满足时,我们就会通过唤醒方法将条件容器内的线程放入同步队列中;如果不满足条件,我们就会通过等待方法将线程阻塞然后放入条件队列中。下图是管程的模型图,正确的来说是Mesa管程的模型图,管程一共有三种类型, AQS使用的是Mesa(PS:原图是从维基上copy的,自己改了下,这里申明下)
我们可以看到,里面很多的小圆圈就是我们前面说的包装了线程的Node,线程只有进入到临界区,即拿到互斥量(锁)了以后,才能够调用唤醒方法notify和等待方法wait。管程,互斥体,信号量这些都是操作系统里面的知识,这些虽然不难,但却很重要,今天主要是讲AQS中与条件变量相关的ConditionObject类,所以就不过多的扩展了,这东西真要讲可能就是一篇单独的文章了,如果大家感兴趣,可以自己去网上找找相关资料看下
一. 属性
我们先看下ConditionObject中的属性
-
/** First node of condition queue. */
-
private
transient Node firstWaiter;
-
/** Last node of condition queue. */
-
private
transient Node lastWaiter;
前面说了,每个条件变量都维护了一个容器,ConditionObject中的容器就是单向链表队列,上面的属性就是队列的头结点firstWaiter和尾结点lastWaiter,需要注意,条件队列中的头结点不是虚拟头结点,而是包装了等待线程的节点!其类型和同步队列一样,也是使用AQS的内部类Node来构成,但与同步队列不同的是,条件队列是一个单向链表,所以他并没有使用Node类中的next属性来关联后继Node,而使用的nextWaiter
-
volatile Node prev;
-
volatile Node next;
-
Node nextWaiter;
这里我们需要注意,nextWaiter是没用volatile修饰的,为什么呢?因为线程在调用await方法进入条件队列时,是已经拥有了锁的,此时是不存在竞争的情况,所以无需通过volatile和cas来保证线程安全。而进入同步队列的都是抢锁失败的,所以肯定是没有锁的,故要考虑线程安全
最后需要注意一点的是,条件队列里面的Node只会存在CANCELLED和CONDITION的状态
属性知道了后,我们来看看方法吧,我们首先介绍唤醒相关的方法
二. 方法signalAll、signal
1 signalAll
顾名思义,就是将条件队列中的所有Node移到同步队列中,然后根据条件再唤醒它们去尝试获得锁
-
public final void signalAll() {
-
if (!isHeldExclusively())
-
throw
new IllegalMonitorStateException();
-
Node first = firstWaiter;
-
if (first !=
null)
-
doSignalAll(first);
-
}
首先我们会通过我们子类复写的方法isHeldExclusively来看此时的线程是否已经获得了锁。前面说过只有获得了锁的线程才能够去唤醒条件队列中的Node。如果获得了锁,我们会判断条件队列的头结点是否为null,为null则说明条件队列中没有阻塞的Node;如果不为null,则会通过doSignalAll方法来将条件队列中的所以Node移动到同步队列中
1.1 doSignalAll
-
private void doSignalAll(Node first) {
-
lastWaiter = firstWaiter =
null;
-
do {
-
// 将next指向first的后继Node
-
Node next = first.nextWaiter;
-
// 切断first与后继Node的联系
-
first.nextWaiter =
null;
-
// 将此node转移到同步队列中
-
transferForSignal(first);
-
// 将first指向first的后继Node
-
first = next;
-
// 在判断此时的first是否为null,不是则继续循环
-
}
while (first !=
null);
-
}
因为是移出条件队列中所有的Node,所以一开始我们通过将头结点和尾节点置为null来“清空”条件队列,然后通过do-while循环将条件队列中所有节点通过transferForSignal方法一个一个转移到同步队列中
1.2 transferForSignal
-
final boolean transferForSignal(Node node) {
-
// 说明此节点状态为CANCELLED,所以跳过该节点(GC会回收)
-
if (!compareAndSetWaitStatus(node, Node.CONDITION,
0))
-
return
false;
-
// 入队方法(独占锁获取中详细阐述过)
-
Node p = enq(node);
-
int ws = p.waitStatus;
-
if (ws >
0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
-
LockSupport.unpark(node.thread);
-
return
true;
-
}
我们首先会通过CAS操作来将Node的状态置为0,如果失败了,说明此时Node状态是CANCELLED,则我们跳过,返回false;如果Node状态成功置为了0,我们就通过enq方法进行入队,此方法已在"独占锁的获取"详细说过,但这里还是需要提醒一下,enq返回的是Node的前驱节点。然后我们会根据前驱节点的状态来看此时是否要唤醒此节点,如果是下面这两种情况,则会将其唤醒,去尝试获取锁
-
如果前驱节点状态是CANCELLED
-
前驱节点不是CANCELLED状态,但CAS将状态变为SIGNAL失败
如果将前驱节点赋值SIGNAL成功了,则该节点就需要等到前驱节点释放锁之后被唤醒了,我们需要注意,只要节点状态不是CANCELLED,transferForSignal方法最后都是返回true
2 signal
signalAll是将条件队列中所有的Node转移到同步队列,signal则只转移条件队列中的第一个状态不为CANNCELLED的Node,直接看源码
-
public final void signal() {
-
if (!isHeldExclusively())
-
throw
new IllegalMonitorStateException();
-
Node first = firstWaiter;
-
if (first !=
null)
-
doSignal(first);
-
}
和signalAll基本一样,不同点只在doSignal方法,我们接着来看doSignal
-
private void doSignal(Node first) {
-
do {
-
// 将firstWaiter指向传入的first的后继节点,
-
// 然后判断firstWaiter是否为null,
-
if ( (firstWaiter = first.nextWaiter) ==
null)
-
lastWaiter =
null;
-
first.nextWaiter =
null;
-
}
while (!transferForSignal(first) &&
-
(first = firstWaiter) !=
null);
-
}
我们可以看到方法里面是个do-While的循环,我们首先将firstWaiter指向first的后继节点,然后判断first的后继节点是否为空,如果为空,则说明条件队列中只有first这一个节点,所以我们将整个队列清空,即将lastWaiter = null。然后我们再将first的的nextWaiter指向null,然后进入while条件语句中。
while条件语句中,首先调用transferForSignal,如果返回为false,说明节点进入同步队列失败(已经被取消了),则我们会判断此节点的下一个节点是否为null,即(first = firstWaiter) != null) ,如果不为null,则会再次进入循环将这个节点进行入队,否则就不会进入到循环队列了;当然,如果transferForSignal返回true,则说明此节点入队成功了,则我们就会退出循环了
三. 方法wait—阻塞前
唤醒方法wait,我们分两小节讲,这一小节我们讲解wait方法线程阻塞前的代码,下一小节我们讲wait被唤醒后的代码
1 await
与唤醒方法相反,wait就是将节点入队并阻塞,等到其他线程唤醒(signal)或者自身中断后再重新去获取锁
-
public final void await() throws InterruptedException {
-
// 如果此线程被中断过,直接抛中断异常
-
if (Thread.interrupted())
-
throw
new InterruptedException();
-
// 将当前线程包装成节点放入条件队列
-
Node node = addConditionWaiter();
-
// 释放当前线程持有的额锁
-
long savedState = fullyRelease(node);
-
// 初始化中断模式参数
-
int interruptMode =
0;
-
// 检查节点s会否在同步队列中
-
while (!isOnSyncQueue(node)) {
-
// 不在同步队列中则阻塞此线程
-
LockSupport.park(
this);
-
if ((interruptMode = checkInterruptWhileWaiting(node)) !=
0)
-
break;
-
}
-
// 被唤醒后再去获取锁
-
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
-
interruptMode = REINTERRUPT;
-
// 当线程是被中断唤醒时,node和后继节点是没有断开的
-
if (node.nextWaiter !=
null)
// clean up if cancelled
-
unlinkCancelledWaiters();
-
// 根据异常标志位对异常进行处理
-
if (interruptMode !=
0)
-
reportInterruptAfterWait(interruptMode);
1.1 addConditionWaiter
-
private Node addConditionWaiter() {
-
Node t = lastWaiter;
-
// If lastWaiter is cancelled, clean out.
-
if (t !=
null && t.waitStatus != Node.CONDITION) {
-
unlinkCancelledWaiters();
-
t = lastWaiter;
-
}
-
Node node =
new Node(Thread.currentThread(), Node.CONDITION);
-
if (t ==
null)
-
firstWaiter = node;
-
else
-
t.nextWaiter = node;
-
lastWaiter = node;
-
return node;
-
}
首先我们会把节点t指向lastWaiter,如果lastWaiter不是null且它的等待状态不是CONDITION,说明lastWaiter的状态是CANCELLED,所以我们会通过unlinkCancelledWaiters方法来移除条件队列中所有CANCELLED的节点,然后将t指向新的lasterWaiter,所以我们可以看到,只要尾结点是CANCELLED,就会将条件队列的所有CANCELLED节点移除
然后我们会将当前线程包装成一个节点,但是与同步队列初始化节点时不同,条件队列新建节点时会把状态置为CONDITION,而同步队列则是默认值0,所以条件队列中的节点只有CONDITION和CANCELLED两种状态。然后我们再会判断下尾结点是否为null,为null说明条件队列为空,所以我们就将firstWaiter指向新的节点;如果不为null,就将尾结点的后继节点指向新节点,然后再重置lastWaiter。最后将新节点返回
addConditionWaiter方法的逻辑大概清楚了,我们再具体看下unlinkCancelledWaiters方法
1.1.1 unlinkCancelledWaiters
-
private void unlinkCancelledWaiters() {
-
Node t = firstWaiter;
-
Node trail =
null;
-
while (t !=
null) {
-
Node next = t.nextWaiter;
-
if (t.waitStatus != Node.CONDITION) {
-
t.nextWaiter =
null;
-
if (trail ==
null)
-
firstWaiter = next;
-
else
-
trail.nextWaiter = next;
-
-
if (next ==
null)
-
lastWaiter = trail;
-
}
-
else
-
trail = t;
-
t = next;
-
}
-
}
这个就是从头结点往后遍历,将Node状态为不为CONDITION的节点移除队列。这个其实是leetCode中的一个简单题,但这里还是讲下,我们维护两个指针t和trail,t指向我们当前需要检查的节点,而trail指向当前节点的前驱节点,如果当前节点需要移除队列,则将trail的后继节点指向当前节点的后继节点
1.2 fullyRelease
我们回到await方法,此时入队成功后,我们就会调用fullyRelease方法来释放当前线程所持有的锁了,我们具体看下源码
-
final long fullyRelease(Node node) {
-
boolean failed =
true;
-
try {
-
long savedState = getState();
-
if (release(savedState)) {
-
failed =
false;
-
return savedState;
-
}
else {
-
throw
new IllegalMonitorStateException();
-
}
-
}
finally {
-
if (failed)
-
node.waitStatus = Node.CANCELLED;
-
}
-
}
其中释放锁成功调用的是release方法,这个方法在"独占锁的释放"中详述过,需要注意,release除了释放线程的锁外,还会将同步队列中的第一个状态不为CANCELLED的节点中的线程唤醒。最终如果释放锁成功,我们就会将failed状态置为false,然后返回savedState状态,否则我们就会抛出异常
我们最后看下finally,如果释放锁失败,我们此线程会抛异常终止,那我们这个线程所在的节点状态就被置为CANCELLED,然后等待后面被移出条件队列,所以这也是我们在addConditonWaiter方法中为什么要检查尾结点是否为CANCELLED的原因
还需要注意的一点是release的入参savedState,这个是获取重入锁的数量,不管之前获得过多少次锁,release方法都会一起释放掉,这也是为什么这个方法起名为fullyRelease的原因
1.3 isOnSyncQueue
这个方法是查看此节点是否在同步队列中
-
final boolean isOnSyncQueue(Node node) {
-
if (node.waitStatus == Node.CONDITION || node.prev ==
null)
-
return
false;
-
if (node.next !=
null)
// If has successor, it must be on queue
-
return
true;
-
return findNodeFromTail(node);
-
}
先看第一个if语句,如果状态是CONDITION或者prev参数是null,说明此节点是在条件队列中,返回为false。我们知道,prev和next都是同步队列中使用的,所以如果两个属性不为null,说明此节点是在同步队列中,因此第二个if条件成立则需要返回true。如果两个if都不成立,说明这个节点状态是0且prev不为null,即我们在"独占锁获取"中CAS进入同步队列的情况,则我们会通过findNodeFromTail方法来确认是不是这种情况
1.3.1 findNodeFromTail
-
private boolean findNodeFromTail(Node node) {
-
Node t = tail;
-
for (;;) {
-
if (t == node)
-
return
true;
-
if (t ==
null)
-
return
false;
-
t = t.prev;
-
}
-
}
如果此时tail就是node的话,说明node在同步队列中,如果不是就像前遍历,但是这里大家可能有疑问,这个方法没有考虑到CAS失败的情况,所以可能存在遍历不到的情况,我们看下作者对这个方法的注释
-
/*
-
* node.prev can be non-null, but not yet on queue because
-
* the CAS to place it on queue can fail. So we have to
-
* traverse from tail to make sure it actually made it. It
-
* will always be near the tail in calls to this method, and
-
* unless the CAS failed (which is unlikely), it will be
-
* there, so we hardly ever traverse much.
-
*/
-
return findNodeFromTail(node);
上面说CAS失败的情况一般不太可能出现,所以这里就没考虑到这种情况了,而且就算没遍历到,外层还有一个while自旋呢
我们再回到await方法
-
// 省略
-
while (!isOnSyncQueue(node)) {
-
LockSupport.park(
this);
-
if ((interruptMode = checkInterruptWhileWaiting(node)) !=
0)
-
break;
-
}
-
// 省略
如果不在同步队列中,则此线程就被park方法阻塞了,只有当线程被唤醒才会在这里开始继续执行下面代码
四. 方法wait—唤醒后
-
public final void await() throws InterruptedException {
-
// 省略。。。。
-
while (!isOnSyncQueue(node)) {
-
// 不在同步队列中则阻塞此线程
-
LockSupport.park(
this);
// <----- 被唤醒后从下面开始
-
if ((interruptMode = checkInterruptWhileWaiting(node)) !=
0)
-
break;
-
}
-
// 被唤醒后再去获取锁
-
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
-
interruptMode = REINTERRUPT;
-
// 当线程是被中断唤醒时,node和后继节点是没有断开的
-
if (node.nextWaiter !=
null)
// clean up if cancelled
-
unlinkCancelledWaiters();
-
// 根据异常标志位对异常进行处理
-
if (interruptMode !=
0)
-
reportInterruptAfterWait(interruptMode);
-
}
我们需要注意的是,线程在这里被唤醒有两种情况,一种是其他线程调用了doSignal或doSignalAll,还有一种就是线程被中断(这两种最终都是调用了unpark方法)。因为在java中,线程被中断后并不是马上就去执行unpark操作,而是先将线程标志位置为true告诉操作系统os我需要被中断,至于os什么时候来执行中断,我们也不清楚,所以在这里,我们需要判断我们被唤醒的原因到底是因为中断还是别的线程唤醒的。这里我们通过checkInterruptWhileWaiting方法来判断,但在讲这个方法前,我们需要先了解这个interruptMode有几种状态
-
/** Mode meaning to reinterrupt on exit from wait */
-
private
static
final
int REINTERRUPT =
1;
-
/** Mode meaning to throw InterruptedException on exit from wait */
-
private
static
final
int THROW_IE = -
1;
除了上面两种,还有一种初始态0,它代表线程没有被中断过,不做任何处理。REINTERRUPT代表wait方法退出时,会重新再中断一次;而THROW_IE则代表wait方法退出时,会抛出InterruptedException异常。了解了状态后,我们来看方法
1 checkInterruptWhileWaiting
-
/**
-
* Checks for interrupt, returning THROW_IE if interrupted
-
* before signalled, REINTERRUPT if after signalled, or
-
* 0 if not interrupted.
-
*/
-
private int checkInterruptWhileWaiting(Node node) {
-
return Thread.interrupted() ?
-
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
-
0;
-
}
我们先看到注释,如果中断先于其他线程调用signal等方法唤醒的,则应该返回THROW_IE,而中断是后于其他线程调用signal等方法唤醒,则返回REINTERRUPT。
我们看下代码,代码就是一个嵌套的三元运算符,首先我们会检查中断标志位,如果interrupted方法返回false,说明没发生中断,则返回0;如果返回了true,则说明中断了,则我们需要通过transferAfterCancelledWait方法进一步检查是否发生了其他线程执行了唤醒操作
1.1 transferAfterCancelledWait
-
final boolean transferAfterCancelledWait(Node node) {
-
if (compareAndSetWaitStatus(node, Node.CONDITION,
0)) {
-
enq(node);
-
return
true;
-
}
-
-
while (!isOnSyncQueue(node))
-
Thread.yield();
-
return
false;
-
}
我们先看第一个if条件,如果条件中的CAS操作成功,说明此时的Node肯定是在条件队列中,则我们调动 enq 方法将此节点放入到同步队列中,然后返回true,但是这里需要特别注意,这个节点的nextWaiter还没置为null
如果CAS失败了,说明这个节点可能已经在同步队列中或者在入队的过程中,所以我们通过while循环等待此节点入队后返回false
我们再回到调用transferAfterCancelled的checkInterruptWhileWaiting方法中,根据transferAfterCancelledWait方法返回值我们最终会返回REINTERRUPT或THROW_IE。
然后我们返回到调用checkInterruptWhileWaiting方法的await方法中
-
public final void await() throws InterruptedException {
-
// 代码省略
-
int interruptMode =
0;
-
while (!isOnSyncQueue(node)) {
-
LockSupport.park(
this);
-
if ((interruptMode = checkInterruptWhileWaiting(node)) !=
0)
// 我们现在在这里!!!
-
break;
-
}
-
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
-
interruptMode = REINTERRUPT;
-
if (node.nextWaiter !=
null)
-
unlinkCancelledWaiters();
-
if (interruptMode !=
0)
-
reportInterruptAfterWait(interruptMode);
-
}
我们可以看到,如果返回值不为0,则直接break跳出循环,如果为0,则再次回到while条件是否检查是否在同步队列中。我们继续往下走看最后三个if语句
我们首先看第一个if语句,我们首先会通过acquireQueued方法来获取锁,这个方法在独占锁中详细讲过,但还是再贴出来稍微提一下
-
final boolean acquireQueued(final Node node, int arg) {
-
boolean failed =
true;
-
try {
-
boolean interrupted =
false;
-
for (;;) {
-
final Node p = node.predecessor();
-
if (p == head && tryAcquire(arg)) {
-
setHead(node);
-
p.next =
null;
// help GC
-
failed =
false;
-
return interrupted;
-
}
-
if (shouldParkAfterFailedAcquire(p, node) &&
-
parkAndCheckInterrupt())
-
interrupted =
true;
-
}
-
}
finally {
-
if (failed)
-
cancelAcquire(node);
-
}
-
}
这个方法首先会检查下节点是否在同步队列第一个,如果在,则会再次尝试获取锁然后成功后则会返回true,如果不在同步队列第一个或者获取锁失败了,则会去挂起,然后等待前驱结点释放锁后再被唤醒。如果在刚刚这个过程中,线程又被中断了,则interrupted则会置为true,然后最终方法返回为true(这里大家没看懂说明独占锁一节没理解,建议回看理解了再看这里)。我们再回到await方法处看if条件
-
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
-
interruptMode = REINTERRUPT;
如果在获取锁的的过程中被中断了,即acquireQueued返回true,我们再将interruptMode为0置为REINTERRUPT。其实简单来说,第一个if语句就是让节点去获取锁,并且如果在获取锁的过程中被中断了,且此线程之前没被中断过,则将interruptMode置为REINTERRUPT。
我们再来看第二个if语句
-
if (node.nextWaiter !=
null)
-
unlinkCancelledWaiters();
啥时候node的nextWaiter不是null。还记得开始说的transferAfterCancelledWait方法吗,当线程是被中断唤醒时,node和后继节点是没有断开的,这一步我们的节点中的线程已经获取锁了且从同步队列中移除了,所以我们在这里将此节点也移除条件队列,unlinkCancelledWaiters方法前面说过,它会将条件队列中所有不为CONDITION的的节点移除
好了到最后一个if语句了,到这里,线程也拿到锁了,包装线程的节点也没在同步队列和条件队列中了,所以wait方法其实已经完成了,所以现在需要对中断进行善后处理了
-
if (interruptMode !=
0)
-
reportInterruptAfterWait(interruptMode);
如果interruptMode不为0,说明线程是被中断过的,所以需要对中断进行处理,我们看下处理方法reportInterruptAfterWait
1.2 reportInterruptAfterWait
-
private void reportInterruptAfterWait(int interruptMode)
-
throws InterruptedException {
-
if (interruptMode == THROW_IE)
-
throw
new InterruptedException();
-
else
if (interruptMode == REINTERRUPT)
-
selfInterrupt();
-
}
可以看到,很简单哈,如果是THROW_IE,就是抛异常,如果是REINTERRUPT,就再自我中断一次,和获取独占锁里面原因一致
好了,wait方法就说完了,唤醒方法除了wait还有几个,这里就不再一一讲解了,大家如果认真看完wait方法,其他几个方法应该是非常容易理解的
到这里,AQS系列的文章就写完了,如果有朋友对文章有什么问题或者发现了什么错误,欢迎大家告诉我:)
(完)
欢迎大家关注我的公众号 “程序员进阶之路”,里面记录了一个非科班程序员的成长之路
转载:https://blog.csdn.net/qq_36582604/article/details/105849161