前言
前面我们通过ReentranLock的源码分析了AQS的独占模式的获取和释放,通过分析源码我们大概的了解的AQS的独占模式功能的实现,这次的话我们就来聊聊AQS的共享模式。如果你已经理解了我们前面说的独占式,那对于理解共享式也是很简单的。
AQS的共享模式VS独占模式
在说AQS共享模式之前,我想先来阐述一下我对共享模式和独占模式的理解。
其实我一开始在理解共享模式的时候也是有点难以理解的,大部分的人把共享模式称为共享锁,但是我尝试使用共享锁这个概念去理解,很多地方都没办法说的通。在看源码的时候,用独占锁概念去代入理解的时候还算可以理解,但是用共享锁去代入理解的话,个人感觉理解起来有些难受,不是特别好好懂。这里针对共享锁
我想换一个概念,使用条件许可
来解释。
我们可以把类似上图代码修饰的代码块看做一个锁,锁住的区域中有一个或者多个许可,我们知道ReenTranLock是独占模式,所以我们可以理解为ReenTranLock中只有一个可用许可
,当一个线程已经得到了许可,那其他线程只能被阻塞,只能等到已经拿到许可的那个线程释放了许可,才能去尝试获取许可。还有一个比较重要的点,是独占模式下AQS会记录下来占有许可的那个线程,而共享模式是不会记录这个的,纯粹的是利用state的状态来进行判断的。
而Semaphore和CountDownLatch的实现是共享模式,虽然都是共享模式,但是还是有点区别的,Semaphore在实例化的时候传入的值,我们可以认为就是有多少个可用的许可,只要state不等于0,线程就可以去访问被修饰的代码块。而CountDownLatch却有些不一样,我们初始化的时候给的值,不是代表可用的许可,我们可以理解为不可用许可
,因为这里它更像是一个阀门,只要state不等于0,所有的线程都要阻塞在await()方法上,只有当调用countDown方法,使state等于0,所有阻塞的线程才能被唤醒,继续执行后续的代码。
为什么理解成条件许可
呢,因为AQS无论是共享模式还是独占模式,都是建立在一定的条件上,这个条件就是对state值的判定,在独占模式下我们是不能去给state赋值的,默认只有一个许可,但是对于同一个线程是可以重入的。共享模式下我们是可以给state进行赋值的,state的值是多少就代表有几个许可能让线程获取,只有获取许可的线程才能访问被保护的代码块。但是CountDownLatch的实现是有点不太一样的,我们可以把CountDownLatch中的state值看作是不可用许可
只有不可用许可的数量为0才能让线程通过。这里我想重点强调的一点是,在共享模式下,获取的不是锁
这个概念,而获取的是许可
,以上就是我个人对这两个概念的理解,如果有误的话多谢指出。
以下是AQS共享模式和独占模式的方法对比,接下来我们就通过代码具体来看一下。
独占锁 | 共享锁 |
---|---|
tryAcquire(int arg) | tryAcquireShared(int arg) |
tryAcquireNanos(int arg, long nanosTimeout) | tryAcquireSharedNanos(int arg, long nanosTimeout) |
acquire(int arg) | acquireShared(int arg) |
acquireQueued(final Node node, int arg) | doAcquireShared(int arg) |
acquireInterruptibly(int arg) | acquireSharedInterruptibly(int arg) |
doAcquireInterruptibly(int arg) | doAcquireSharedInterruptibly(int arg) |
doAcquireNanos(int arg, long nanosTimeout) | doAcquireSharedNanos(int arg, long nanosTimeout) |
release(int arg) | releaseShared(int arg) |
tryRelease(int arg) | tryReleaseShared(int arg) |
- | doReleaseShared() |
Example: CoutDownLatch
我们就先以CoutDownLatch为例,来看看共享模式的大概实现。
CountDownLath的结构
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
...
}
protected boolean tryReleaseShared(int releases) {
...
}
}
private final Sync sync;
public CountDownLatch(int count) {
...
}
public void await() throws InterruptedException {
...
}
public boolean await(long timeout, TimeUnit unit)
...
}
public void countDown() {
...
}
//获取AQS中state的值
public long getCount() {
...
}
我们通过源码看到方法也就这几个,类中也是有一个内部类继承了AQS,重写了tryAcquireShared 和 tryReleaseShared,实现的逻辑也是相当的简单,比较核心的方法也就两个,一个是countDown方法,每调用一次,就会将当前的state值减一,当state值为0时,就会唤醒所有等待中的线程;另一个是await方法,它有两种形式,一种是阻塞式,一种是带超时机制的形式,如果state的不为0,挂起调用await方法的线程,直到state等于0,唤醒所有等待的线程。
获取许可
await
该方法是阻塞式地等待,并且是响应中断的。
public void await() throws InterruptedException {
//acquireSharedInterruptibly()和acquireShared()方法实现都是差不太多的,一个是支持中断,一个不支持中断而已。我们就不单独去分许acquireShared()了。
sync.acquireSharedInterruptibly(1);
}
await()方法直接调用了AQS中的acquireSharedInterruptibly方法
//这个方法是AQS中的
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
这个方法就是CountdonwLatch 重写的 AQS中的tryAcquireShared方法。
protected int tryAcquireShared(int acquires) {
//我们可看到这里直接就是判断state的是否是等于0的,等于0返回的是1,不等于0返回的是-1.
return (getState() == 0) ? 1 : -1;
}
这里能看到tryAcquireShared方法返回的是一个int类型的值,这里通过判断当前的state值是否是等于0,来决定返回1 还是 -1。当tryAcquireShared返回的值小于 0 说明目前还有state值不等于0,不等于0代表还有我们前面说的 不可用许可
。
doAcquireSharedInterruptibly
这里我们可以看到,当tryAcquireShared返回 -1说明state值不等于0,就执行doAcquireSharedInterruptibly方法,将线程封装成Node节点,加入等待队列中。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//其实这里的acquireQueued方法里面有两个参数,这里addWaiter放在了方法里面来.
//在说独占模式的时候addWaiter我们也说过了,这一步主要是将没有抢到许可的的线程加入到队列中。但是此时线程还没有被挂起。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//经过addWaiter方法,已经将阻塞的线程封装成Node节点,添加到了队列中
//判断当前节点的前驱节点是不是头结点。
final Node p = node.predecessor();
if (p == head) {
//if (p == head && tryAcquire(arg))这个是独占模式中acquireQueued对应的一段,前面我们也说过
//只不过tryAcquire返回的是一个boolean值而已。
//因为刚添加到队列中的节点是阻塞队列中的第一个,我们可以再次去看一下statede值是否等于0.
int r = tryAcquireShared(arg);
//如果返回的值大于0说明state值的为0。后续的线程可以直接通过了。
if (r >= 0) {
//此时tryAcquireShared返回值大于0说明state已经等于0了。
//这个方法也是一个重点的方法,单独拿出来说。
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//当前Node不是在CLH队列的第一位或者是当前线程获取许可失败,判断是否需要把当前线程挂起。检查线程是否已经中断,如果中断直接抛出异常。这里我们在前面文章也说过了,这里可以去参考前面独占模式说的。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//如果代码走到cancelAcquire说明肯定是抛出了InterruptedException异常,因为上面是一个死循环,退出循环的两个途径,一个是走了return,另外一个就是抛出了异常,如果是正常return,failed应该是false,不会执行cancelAcquire
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate
我们通过对比一下独占模式中的方法,能够发现独占模式中的方法,如果当前节点是在阻塞队列的第一个,就将头结点指向当前节点,只是单纯的将头结点指向了当前的节点。
我们知道当调用setHeadAndPropagate的条件就是tryAcquireShared返回的值大于0,我们知道tryAcquireShared是交给实现的具体类去实现的,所以我们理解tryAcquireShared返回的含义,还是要看具体的实现类是什么,就仅仅在AQS中对于tryAcquireShared返回的值大于0我们可以理解为存在可以获取的许可数。
setHeadAndPropagate方法除了将头结点指向的当前节点以外,还在一定的条件下进行doReleaseShared(),这个方法本来是应该在释放许可的时候才会出现的,去唤醒等待队列中的线程,但是在这里出现了。
private void setHeadAndPropagate(Node node, int propagate) {
// 记录了之前旧的头结点
Node h = head;
//将当前节点设置为头节点。因为state = 0,而且Node节点在等待队里的第一个,可以把Node设为头结点了。
setHead(node);
//如果走到了这个方法说明propagate是大于0的。
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//如果说当前节点后面没有了其他节点,且节点是共享节点,那就去执行doReleaseShared
Node s = node.next;
if (s == null || s.isShared())
//这里就是释放许可,然后唤醒等待队列中的线程。
doReleaseShared();
}
}
这里我不太明白为什么要去判断旧的头结点的状态。看了他给的注释,没有想太明白为什么要这样去判断呢。
其实我还有一个疑问的,上面我们知道调用setHeadAndPropagate的前提是Node是在队列的第一个,tryAcquireShared返回的值是大于0的。调用setHeadAndPropagate方法将Node节点设为头结点我是理解的,但是为什么还要去调用doReleaseShared去唤醒等待的线程呢,因为此时阻塞队列里面只有一Node节点,而且此时Node节点还没有被挂起。
带着上面的问题我们继续往下看doReleaseShared这个方法,这里我们就不对这个方法进行详细解释了,到下面分析释放许可的时候再具体分析。这里我主要想说明,为什么会在await()方法中调用这个方法。
// 调用这个方法的时候,state == 0
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
我们通过doReleaseShared方法可以发现unparkSuccessor是唤醒被挂起的线程,调用这个方法需要满足h != null && h != tail
,前面调用addWaiter方法的时候将Node节点加入到了队列中,对队列进行了初始化,此时tail = node,如下图左边的一样。
我们调用 setHead(node)的时候又将 head = node ,变成了右图所示,那此时 tail = head 的。不会执行unparkSuccessor方法。
此时h == head 头结点没有发生变化也就退出了循环。所以说在doAcquireSharedInterruptibly这个方法中调用的setHeadAndPropagate方法,只是将node节点设置成了头结点,没有执行unparkSuccessor()。所以说在这里和独占模式下setHead()方法的作用是一样的。
释放许可
当执行 CountDownLatch 的 countDown()方法,将计数器减一,也就是state减一,当减到0的时候,等待队列中的线程被释放。我们一起来看一下
countDown
public void countDown() {
sync.releaseShared(1);
}
使用的是releaseShared(int arg)来释放许可。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared
这个方法很简单,用自旋的方法实现 state 减 1,如果本来state的值就是0,那就返回false,返回false也就意味个releaseShared方法不会再doReleaseShared()方法唤醒等待线程了,因为state已经为0了。只有当 state -1 等于0,才会返回true,去唤醒等待队列中的线程。
protected boolean tryReleaseShared(int releases) {
//使用了一个死循环,当state == 0 或者 CAS设置state的值成功以后退出循环。
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
/**这里我们对比一下Semaphore中的tryReleaseShared,这个方法是一个死循环,除非抛出异常,不然最后肯定是返回true的。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
*/
doReleaseShared
countDown 方法就是每次调用都将 state 值减 1,如果 state 减到 0 了,那么就调用下面的方法进行唤醒阻塞队列中的线程
/**
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
*/
//在CountDownLatch中,tryReleaseShared(arg)返回true,只有当state等于0。
private void doReleaseShared() {
for (;;) {
Node h = head;
// 1. h == null: 说明阻塞队列为空
// 2. h == tail: 说明头结点可能是刚刚初始化的头节点, 或者是普通线程节点,但是此节点既然是头节点了,那么 代表已经被唤醒了,阻塞队列没有其他节点了
// 所以这两种情况不需要进行唤醒后继节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// 判断头节点是否是正常的状态。后继节点需要被唤醒
if (ws == Node.SIGNAL) {
// 这里 CAS 失败的场景请看下面的解读
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 就是这里,唤醒 head 的后继节点。我们上一篇文章也说了这个方法,这里就不在说了。
unparkSuccessor(h);
}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
该方法是一个自旋操作(for(;;)
),退出该方法的唯一办法是走最后的break语句:
if (h == head) // loop if head changed
break;
即,只有在当前head没有易主时,才会退出,否则继续循环。这个怎么理解呢?
为了说明问题,这里我们假设目前sync queue队列中依次排列有
dummy node -> A -> B -> C -> D
现在假设A或许可以成功,则它将成为新的dummy node,
dummy node (A) -> B -> C -> D
A线程被唤醒了,唤醒以后此时线程还结束呢,唤醒以后又回到了被挂起的地方。
这里是一个for(;;)
的死循环,退出的唯一就是从return 结束。此时线程A会将自己设置为了头节点。
唤醒后继的节点B,它很快获得了共享锁,成为了新的头节点:
dummy node (B) -> C -> D
此时,B线程也会调用doReleaseShared,我们写做doReleaseShared[B]
,在该方法中将唤醒后继的节点C,但是别忘了,在doReleaseShared[B]
调用的时候,doReleaseShared[A]
还没运行结束呢,当它运行到if(h == head)
时,发现头节点现在已经变了,所以它将继续回到for循环中,与此同时,doReleaseShared[B]
也没闲着,它在执行过程中也进入到了for循环中。。。
大量的线程在同时执行doReleaseShared,这极大地加速了唤醒后继节点的速度,提升了效率,同时该方法内部的CAS操作又保证了多个线程同时唤醒一个节点时,只有一个线程能操作成功。
那如果这里A线程执行结束时,节点B还没有成为新的头节点时,A方法不就退出了吗?是的,但即使这样也没有关系,因为它已经成功唤醒了线程B,即使A线程退出了,当B线程成为新的头节点时,它也会负责唤醒后继节点的。
到这里我们就把AQS的共享模式分析了一遍,小伙伴们可以自己好好的捋一捋这个流程。
总结
- 共享模式的调用框架和独占模式很相似,我们可以把独占模式看作内部只允许有一个可用许可,共享模式可以允许有多个可用的许可。
- 共享模式和独占模式还有一个最大的区别就是,独占模式会用一个变量记录获取许可的线程是哪一个,而共享模式是不记录这个的,完全依赖于state变量的值。
写在最后
上次写完独占模式那篇文章以后,就一直在加班,五一之前已经连续两个星期没有休息了,五一又加了两天班,实在最近感觉没有什么状态,回去以后也没有精力写东西,抽时间总算又磕磕绊绊的写完了这篇文章,如果有写的不对的地方希望可以帮忙指出,我会第一时间修改。
希望看完以后有收获的小伙伴帮忙点个赞鼓励一下呗,你的认可是对我最大的鼓励。
转载:https://blog.csdn.net/qq_21269947/article/details/105932631