什么是AQS?AQS有什么用呢?
本篇文章主要就是解决这两个问题,并且附上源码解析。
AQS 的全称是 AbstactQueuedSynchronizer 即抽象队列同步器。
可能大部分使用Java语言的同学都知道它,因为他是面试的高频问题之一,面试Android也会问这样的问题,我自己就被问了好几次。
java并发包下很多API都是基于AQS来实现的加锁和释放锁等功能的,AQS是java并发包的基础类。比如:ReetrantLock ,ReentrantReadWriteLock 都是基于AQS来实现的。
ReentrantLock 实现加锁和锁释放就是通过AQS来实现的。
先看一段代码:
private void doTask1(){
try {
reentrantLock.lock();
Log.e("aqs", "doTask1 获得锁");
Thread.sleep(3 * 1000);
// doTask2();
}catch (Exception e){
e.printStackTrace();
}finally {
reentrantLock.unlock();
Log.e("aqs", "doTask1 释放锁");
}
}
如上,如果一个线程调用 lock 会发生什么呢?
其实这个不难,不要一涉及到AQS就觉得很难。AQS 中维护了一个很重要的变量 state, 它是int型的,表示加锁的状态,初始状态值为0;另外 AQS 还维护了一个很重要的变量exclusiveOwnerThread,它表示的是获得锁的线程,也叫独占线程。AQS中还有一个用来存储获取锁失败线程的队列,以及head 和 tail 结点,包含 如下图所示
这时,线程1 跑过来调用ReentrantLock的lock()方法尝试进行加锁,这个加锁的过程,直接就是用CAS操作将state值从0变为1。如果对CAS操作不理解的话,可以看看我之前的文章:我对CAS的理解和用法
如果这时候没有其他的线程操作,那么CAS操作肯定是成功的,然后设置 exclusiveOwnerThread 为 当前线程。lock的代码如下(这里是以默认的非公平锁为例):
final void lock() {
//通过CAS操作 ,如果当前的state 等于0 那么 cas 就会操作成功,返回true,表示当前线程成功获取锁
if (compareAndSetState(0, 1))
//设置exclusiveOwnerThread 为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
我们从ReentrantLock的名字就可以知道它是可以重入的,那么它的重入是怎么实现的呢?对的,就是跟 state 和 exclusiveOwnerThread 有关,具体是怎么样的呢?看下面的例子
private void doTask1(){
try {
reentrantLock.lock();
Thread.sleep(3 * 1000);
doTask2();
}catch (Exception e){
e.printStackTrace();
}finally {
reentrantLock.unlock();
}
}
private void doTask2(){
try {
reentrantLock.lock();
Thread.sleep(10 * 1000);
}catch (Exception e){
e.printStackTrace();
}finally {
reentrantLock.unlock();
}
}
线程 先执行 doTask1,然后doTask1中执行doTask2,由于是同一个线程和同一把锁,所以就可以重入了。
具体流程就是:线程执行到doTask2的时候, 执行 lock 发现 state 已经不是0而是1了,然后检查 当前线程是不是和获取锁的线程是同一个,结果发现是同一个,所以 state+1 = 2,这就是可重入的核心原理。源码如下,在 ReentranLock 中,具体的调用关系 我就不列出来了。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果 state = 0,那么 通过cas来操作获取锁,跟之前的流程一样,这里为什么还要执行同样的操作呢?因为可能执行到这里的时候,上一个线程刚好执行完,state-- 等于0
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//这里就是可重入的逻辑呢,
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
//小于0表示可重入的次数大于int型最大值,产生溢出了。
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
那么 此时,如果线程2 来调用 reentrntlock.lock()方法来获取锁会是什么样子的呢?
线程2跑过来一下看到,发现state的值不是0啊?所以CAS操作将state从0变为1的过程会失败,因为state的值当前为1,说明已经有人加锁了!
接着线程2会看一下,是不是自己之前加的锁啊?当然不是了,exclusiveOwnerThread这个变量明确记录了是线程1占用了这个锁,所以线程2此时就是加锁失败。
加锁失败后是怎么操作的呢? 加锁失败后 ,此时就要将自己放入队列中来等待,等待线程1释放锁之后,自己就可以重新尝试加锁了。
具体的代码如下:
public final void acquire(int arg) {
// tryAcquire 就是调用上面nonfairTryAcquire,由于是线程1没有释放,所以线程2 调用tryAcquire返回false, 接着调用 acquireQueued方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquireQueued 中先调用 addWaiter ,addWaiter 代码如下:
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
//表示等待队列里有其他的线程在等待了,然后就是设置node为尾结点
if (oldTail != null) {
//当前的node的PREV 指向 尾结点oldTail
U.putObject(node, Node.PREV, oldTail);
//把尾结点设置当前的node结点
if (compareAndSetTail(oldTail, node)) {
//之前的尾结点的next指向node
oldTail.next = node;
return node;
}
} else {//如果之前的等待队列没有等待的线程,那么new一个node,让head和tail指向这个new出来的结点
initializeSyncQueue();
}
}
}
上面的把node结点设置为尾结点的操作不知道大家看明白没?我画个图来说明下:
U.putObject(node, Node.PREV, oldTail);
compareAndSetTail(oldTail, node) 的操作如下,也是通过cas完成的,
oldTail.next = node; 就很简单了,如下:
通过上面的3部操作就可以把 获取锁失败的线程放到等待队列的尾部。
接着看看 acquireQueued的源码,如下:
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
//判断之前的结点是不是头结点 head,如果是头结点就尝试去获取锁,
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//获取锁成功的话,就把当前线程设置为head
setHead(node);
//断开之前头结点
p.next = null; // help GC
return interrupted;
}
//如果之前的不是头结点,那么就要等待了,等候之前的线程释放锁后,调用 LockSupport来唤醒,
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
接下来先看 shouldParkAfterFailedAcquire,如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 注意Node的waitStatus字段我们在上面创建Node的时候并没有指定 ,默认值是0
// waitStatus 的4种状态
//static final int CANCELLED = 1;
//static final int SIGNAL = -1; //等待被唤醒
//static final int CONDITION = -2; //条件锁使用
//static final int PROPAGATE = -3; //共享锁时使用
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
// 如果 ws > 0,则表示是取消状态,然后通过while循环 把所有是取消状态的线程从等待队列中删除
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {//如果不是取消状态,则通过cas操作将该线程的waitStatus设置为等待唤醒状态
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
上面的shouldParkAfterFailedAcquire方法只是将waitStatus设置为SIGNAL,但是并没有阻塞操作,真正的阻塞操作在下面的方法 parkAndCheckInterrupt,如下:
private final boolean parkAndCheckInterrupt() {
//阻塞当前线程,底层实现是unsafe
LockSupport.park(this);
//返回当前线程是否被中断
return Thread.interrupted();
}
这里对lock方法作以下总结:
- 当线程1调用lock方法时,首先看 AQS 的 state 是否为0,如果是0的话,通过cas操作将state置为1,并且设置独占线程为当前线程
- 如果这时候线程1 要调用另外一个lock方法,就像我上面的例子那样,那么线程1会发现 state = 1,它再去看独占线程是不是就是自己,如果是的话 state + 1 ,获取锁成功。
- 如果线程1 执行的方法还没有完成即锁还没有释放,此时线程2调用lock方法,由于线程1没有释放锁,那么state不会等于0,且独占线程是线程1而不是自己(线程2),所以AQS会把线程2放到等待队列的尾部,如果线程2的前置结点是头结点head,那么线程2会通过死循环一直去获取锁,如果不是头结点那么就会阻塞线程2,等待线程1释放锁且唤醒它。
这里可能有点饶,看下我画的图。
线程锁释放是怎么样的呢?
我们知道是调用 unlock来实现的,具体是什么样的呢?其实很简单 就是将 state-- 直到state = 0,然后通过 LockSupport.unpark()来唤醒等待队列中的下一个结点。具体的看源码:
public void unlock() {
sync.release(1);
}
这个没啥子好说的,接着调用AQS的 release方法,如下:
public final boolean release(int arg) {
//如果释放当前线程成功的话,那么就去唤醒等待队列的头结点
if (tryRelease(arg)) {
Node h = head;
//头结点不为空且waitStatus不等于0
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
来看看 tryRelease 方法,实在Reentrantlock 中实现的,如下:
protected final boolean tryRelease(int releases) {
//state - 1
int c = getState() - releases;
//如果当前线程不是之前设置的独占线程则抛出锁状态异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//如果c == 0表示当前线程已经释放锁了,然后设置独占线程为null,如果不等于0说明当前线程执行了可重入操作,等可重入的方法执行完 调用 unlock方法,会执行本方法 state会等于0
free = true;
setExclusiveOwnerThread(null);
}
//state 值置为 0
setState(c);
return free;
}
接下来看看 unparkSuccessor ,如下:
private void unparkSuccessor(Node node) {
//node 表示是头结点,如果头结点的 waitStatue < 0,则置为0,
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
//s 表示的是头结点的下一个结点。为什么是唤醒下一个结点而不是头结点呢?
//因为我们上面调用addWaiter方法的时候,如果等待队列里面没有等待线程,那么直接
//new 一个Node 然后 head 和 tail 都指向这个 node,换句话说这个头结点只是用来占位的,所以要从头结点的下一个结点开始唤醒
Node s = node.next;
//waitStatus 大于0 表示该线程已经取消了,
if (s == null || s.waitStatus > 0) {
s = null;
//从队列的尾部开始遍历,找到一个waitStatue 小于等于0的线程来唤醒
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)//唤醒线程
LockSupport.unpark(s.thread);
}
结语:AQS 结合 ReentrantLock的加锁和解锁已经介绍完了,有问题可以一起交流交流啊!
转载:https://blog.csdn.net/zhujiangtaotaise/article/details/105843274