浅析抽象队列同步器(AQS)
目录
除非我不想赢,否则没人能让我输。
复习多线程并发包总结
什么是AQS
AQS(AbstractQueuedSynchronizer)是一个抽象队列同步器,通过维护一个共享资源状态(volatile int state)和一个FIFO线程等待队列(底层是双向链表)来实现一个多线程访问共享资源的同步框架。许多同步类的实现都依赖于AQS,例如常用的ReentrantLock,CountDownLatch,Semaphore。
关于ReentrantLock,CountDownLatch,Semaphore的用法可参考:
[常用的三种同步类] https://blog.csdn.net/qq_42107430/article/details/103854488
JDK1.8源码:
-
/**
-
* The synchronization state.
-
*/
-
private volatile int state;
-
-
static final class Node {
-
static final Node SHARED = new Node();
-
static final Node EXCLUSIVE = null;
-
static final int CANCELLED = 1;
-
static final int SIGNAL = -1;
-
static final int CONDITION = -2;
-
static final int PROPAGATE = -3;
-
volatile int waitStatus;
-
volatile Node prev; //前驱节点
-
volatile Node next; //后继节点
-
volatile Thread thread;//当前线程
-
Node nextWaiter; //存储在condition队列中的后继节点
-
//是否为共享锁
-
final boolean isShared() {
-
return nextWaiter == SHARED;
-
}
-
-
final Node predecessor() throws NullPointerException {
-
Node p = prev;
-
if (p == null)
-
throw new NullPointerException();
-
else
-
return p;
-
}
-
-
Node() { // Used to establish initial head or SHARED marker
-
}
-
//将线程构造成一个Node,添加到等待队列
-
Node(Thread thread, Node mode) { // Used by addWaiter
-
this.nextWaiter = mode;
-
this.thread = thread;
-
}
-
//这个方法会在Condition队列使用,后续单独写一篇文章分析condition
-
Node(Thread thread, int waitStatus) { // Used by Condition
-
this.waitStatus = waitStatus;
-
this.thread = thread;
-
}
-
}
AQS的原理
AQS为每个共享资源变量设置一个共享资源锁,线程在需要访问共享资源时首先需要去获取共享资源缩。
如果获取成功,便可以在当前线程中使用该共享资源,如果获取不成功,则将该线程放入线程等待队列,等待下一次资源调度。
state状态
AQS维护了一个volatile int 类型的变量,用于表示当前的同步状态。Volatile虽然不能保证操作的原子性,但是可以保证操作的可见性。
state的访问方式有以下三种,均是原子操作
-
getState()
-
setState()
-
compareAndSetState()
-
/**
-
* Returns the current value of synchronization state.
-
* This operation has memory semantics of a {@code volatile} read.
-
* @return current state value
-
*/
-
protected final int getState() {
-
return state;
-
}
-
-
/**
-
* Sets the value of synchronization state.
-
* This operation has memory semantics of a {@code volatile} write.
-
* @param newState the new state value
-
*/
-
protected final void setState(int newState) {
-
state = newState;
-
}
-
-
/**
-
* Atomically sets synchronization state to the given updated
-
* value if the current state value equals the expected value.
-
* This operation has memory semantics of a {@code volatile} read
-
* and write.
-
*
-
* @param expect the expected value
-
* @param update the new value
-
* @return {@code true} if successful. False return indicates that the actual
-
* value was not equal to the expected value.
-
*/
-
protected final boolean compareAndSetState(int expect, int update) {
-
// See below for intrinsics setup to support this
-
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
-
}
AQS的共享资源状态:独占式和共享式
AQS定义两种资源共享方式:独占式(Exclusive)和共享式(Share)。
-
独占式:只有一个线程能执行,如ReentrantLock
-
共享式:共享,多个线程可同时执行,如Semaphore/CountDownLatch
AQS只是一个框架,定义了一个接口,具体的资源获取、释放都交由自定义同步器实现。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
-
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
-
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
-
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
-
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
-
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。否则程序运行时会报错。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续执行后续动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
添加锁和释放锁
当出现锁竞争以及释放锁的时候,AQS同步队列中的节点会发生变化。
添加节点
添加节点时会涉及到两个变化
-
新的线程封装成Node节点追加到同步队列中,设置prev节点以及修改当前节点的前置节点的next节点指向自己
-
通过CAS讲tail重新指向新的尾部节点
移除节点
head节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点
这个过程也是涉及到两个变化
-
修改head节点指向下一个获得锁的节点
-
新的获得锁的节点,将prev的指针指向null
了解了AQS是什么,原理实现后,我们结合ReentrantLock来深入理解AQS是如何实现线程安全的。
什么是ReentrantLock
Java中除了使用关键字synchronized外,还可以使用ReentrantLock实现独占锁的功能。而且ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。
实现
ReentrantLock继承了Lock接口并实现了在接口中定义的方法。是一个可重入的独占锁。通过自定义抽象队列同步器来实现。
Lock接口JDK源码
-
void lock() // 如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放
-
void lockInterruptibly() // 和 lock()方法相似, 但阻塞的线程可中断,抛出 java.lang.InterruptedException异常
-
boolean tryLock() // 非阻塞获取锁;尝试获取锁,如果成功返回true
-
boolean tryLock(long timeout, TimeUnit timeUnit) //带有超时时间的获取锁方法
-
void unlock() // 释放锁
如何使用
-
public class ReentrantLockDemo {
-
private static int count=0;
-
static Lock lock=new ReentrantLock();
-
public static void inc(){
-
lock.lock();
-
try {
-
Thread.sleep(1);
-
count++;
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}finally{
-
lock.unlock();
-
}
-
}
这段代码主要做一件事,就是通过一个静态的incr()
方法对共享变量count
做连续递增,在没有加同步锁的情况下多线程访问这个方法一定会存在线程安全问题。所以用到了ReentrantLock
来实现同步锁,并且在finally语句块中显式释放锁。
底层实现
ReentrantLock.lock()
-
public void lock() {
-
sync.lock();
-
}
可以看到lock()方法底层调用的是sync的lock()方法。
sync是一个静态内部类,通过继承AQS并实现了共享资源state的获取和释放的方式。
Sync
-
abstract static class Sync extends AbstractQueuedSynchronizer {
-
private static final long serialVersionUID = -5179523762034025860L;
-
-
/*定义一个抽象方法,由具体的子类去实现*/
-
abstract void lock();
-
-
/**
-
* 实现非公平的tryAcquire获取资源
-
*/
-
final boolean nonfairTryAcquire(int acquires) {
-
final Thread current = Thread.currentThread();//获取当前线程
-
int c = getState();//获取同步状态
-
if (c == 0) {//如果状态为0 CAS设置acquires
-
if (compareAndSetState(0, acquires)) {
-
setExclusiveOwnerThread(current);//如果设置成功,设置当前线程为排他所有者线程
-
return true;
-
}
-
}
-
else if (current == getExclusiveOwnerThread()) {//如果当前线程就是排他线程
-
int nextc = c + acquires;
-
if (nextc
< 0) // overflow 溢出报错
-
throw new Error("Maximum lock count exceeded");
-
setState(nextc);//重新设置state
-
return true;
-
}
-
return false;
-
}
-
-
/*
-
* 尝试释放资源
-
*/
-
protected final boolean tryRelease(int releases) {
-
//计算要更新的同步状态
-
int c = getState() - releases;
-
//如果当前线程不是排他线程 报错
-
if (Thread.currentThread() != getExclusiveOwnerThread())
-
throw new IllegalMonitorStateException();
-
boolean free = false;
-
//如果状态为0,设置独占排他线程为null,返回true
-
if (c == 0) {
-
free = true;
-
setExclusiveOwnerThread(null);
-
}
-
//更新同步状态
-
setState(c);
-
return free;
-
}
-
-
protected final boolean isHeldExclusively() {
-
// While we must in general read state before owner,
-
// we don't need to do so to check if current thread is owner
-
return getExclusiveOwnerThread() == Thread.currentThread();
-
}
-
-
/*获取当前线程*/
-
final Thread getOwner() {
-
return getState() == 0 ? null : getExclusiveOwnerThread();
-
}
-
-
/*获取当前state状态*/
-
final int getHoldCount() {
-
return isHeldExclusively() ? getState() : 0;
-
}
-
-
/*判断是否被锁定*/
-
final boolean isLocked() {
-
return getState() != 0;
-
}
-
-
}
Sync又有两个具体的实现,分别是NofairSync(非公平锁),FairSync(公平锁)。
-
公平锁 表示所有线程严格按照FIFO来获取锁
-
非公平锁 表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁
NofairSync
-
-
/**
-
* Sync object for non-fair locks 非公平锁
-
*/
-
static final class NonfairSync extends Sync {
-
private static final long serialVersionUID = 7316153563782823691L;
-
-
/**
-
* 执行锁定
-
*/
-
final void lock() {
-
//首先通过CAS设置state,如果成功,设置当前线程为排他线程(非公平的关键)
-
if (compareAndSetState(0, 1))
-
setExclusiveOwnerThread(Thread.currentThread());
-
else//如果失败再去尝试获得锁 关于acquire的具体讲解在下面
-
acquire(1);
-
}
-
-
/*调用父类sync的非公平tryAcquire获取资源*/
-
protected final boolean tryAcquire(int acquires) {
-
return nonfairTryAcquire(acquires);
-
}
-
}
lock()方法简单解释一下
-
由于这里是非公平锁,所以调用lock方法时,先去通过cas去抢占锁
-
如果抢占锁成功,保存获得锁成功的当前线程
-
抢占锁失败,调用acquire来走锁竞争逻辑
compareAndSetState调用的是Unsafe类的compareAndSetState方法进行原子操作
return unsafe.compareAndSetState(this, stateOffset, expect, update);
UnsafeUnsafe类是在sun.misc包下,不属于Java标准。但是很多Java的基础类库,包括一些被广泛使用的高性能开发库都是基于Unsafe类开发的,比如Netty、Hadoop、Kafka等;Unsafe可认为是Java中留下的后门,提供了一些低层次操作,如直接内存访问、线程调度等
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
这个是一个native方法, 第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的headOffset的值),第三个参数为期待的值,第四个为更新后的值整个方法的作用是如果当前时刻的值等于预期值var4相等,则更新为新的期望值 var5,如果更新成功,则返回true,否则返回false;
FairSync
-
/**
-
* Sync object for fair locks
-
*/
-
static final class FairSync extends Sync {
-
private static final long serialVersionUID = -3000897897090466540L;
-
-
//尝试获取锁
-
final void lock() {
-
acquire(1);
-
}
-
-
/**
-
* 公平版本的tryAcquire
-
* Fair version of tryAcquire.
-
*/
-
protected final boolean tryAcquire(int acquires) {
-
final Thread current = Thread.currentThread();
-
int c = getState();
-
if (c == 0) {
-
if (!hasQueuedPredecessors() &&
-
compareAndSetState(0, acquires)) {
-
setExclusiveOwnerThread(current);
-
return true;
-
}
-
}
-
else if (current == getExclusiveOwnerThread()) {
-
int nextc = c + acquires;
-
if (nextc
< 0)
-
throw new Error("Maximum lock count exceeded");
-
setState(nextc);
-
return true;
-
}
-
return false;
-
}
-
}
acquire
acquire是AQS中的方法,如果CAS操作未能成功,说明state已经不为0,此时继续acquire(1)操作,这里大家思考一下,acquire方法中的1的参数是用来做什么呢?如果没猜中,往前面回顾一下state这个概念
-
public final void acquire(int arg) {
-
if (!tryAcquire(arg) &&
-
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
-
selfInterrupt();
-
}
这个方法的主要逻辑是
-
通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false
-
如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部
-
acquireQueued,将Node作为参数,通过自旋去尝试获取锁。
addWaiter
-
private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE
-
//将当前线程封装成Node,并且mode为独占锁
-
Node node = new Node(Thread.currentThread(), mode);
-
// Try the fast path of enq; backup to full enq on failure
-
// tail是AQS的中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方法
-
Node pred = tail;
-
if (pred != null) { //tail不为空的情况,说明队列中存在节点数据
-
node.prev = pred; //讲当前线程的Node的prev节点指向tail
-
if (compareAndSetTail(pred, node)) {//通过cas讲node添加到AQS队列
-
pred.next = node;//cas成功,把旧的tail的next指针指向新的tail
-
return node;
-
}
-
}
-
enq(node); //tail=null,将node添加到同步队列中
-
return node;
-
}
ReentrantLock.unlock()
-
public void unlock() {
-
sync.release(1);
-
}
release
1 释放锁 ;2 唤醒park的线程
-
public final boolean release(int arg) {
-
if (tryRelease(arg)) {
-
Node h = head;
-
if (h != null && h.waitStatus != 0)
-
unparkSuccessor(h);
-
return true;
-
}
-
return false;
-
}
tryRelease
这个动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是1),如果结果状态为0,就将排它锁的Owner设置为null,以使得其它的线程有机会进行执行。在排它锁中,加锁的时候状态会增加1(当然可以自己修改这个值),在解锁的时候减掉1,同一个锁,在可以重入后,可能会被叠加为2、3、4这些值,只有unlock()的次数与lock()的次数对应才会将Owner线程设置为空,而且也只有这种情况下才会返回true。
-
protected final boolean tryRelease(int releases) {
-
int c = getState() - releases; // 这里是将锁的数量减1
-
if (Thread.currentThread() != getExclusiveOwnerThread())// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
-
throw new IllegalMonitorStateException();
-
boolean free = false;
-
if (c == 0) {
-
// 由于重入的关系,不是每次释放锁c都等于0,
-
// 直到最后一次释放锁时,才会把当前线程释放
-
free = true;
-
setExclusiveOwnerThread(null);
-
}
-
setState(c);
-
return free;
-
}
unparkSuccessor
在方法unparkSuccessor(Node)中,就意味着真正要释放锁了,它传入的是head节点(head节点是占用锁的节点),当前线程被释放之后,需要唤醒下一个节点的线程
-
private void unparkSuccessor(Node node) {
-
int ws = node.waitStatus;
-
if (ws
< 0)
-
compareAndSetWaitStatus(
node,
ws,
0);
-
Node
s =
node.next;
-
if (
s ==
null ||
s.waitStatus > 0) {//判断后继节点是否为空或者是否是取消状态,
-
s = null;
-
for (Node t = tail; t != null && t != node; t = t.prev)
-
if (t.waitStatus
<= 0) //然后从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点, 至于为什么从尾部开始向前遍历,因为在doAcquireInterruptibly.cancelAcquire方法的处理过程中只设置了next的变化,没有设置prev的变化,在最后有这样一行代码:node.next = node,如果这时执行了unparkSuccessor方法,并且向后遍历的话,就成了死循环了,所以这时只有prev是稳定的
-
s = t;
-
}
-
//内部首先会发生的动作是获取head节点的next节点,如果获取到的节点不为空,则直接通过:“LockSupport.unpark()”方法来释放对应的被挂起的线程,这样一来将会有一个节点唤醒后继续进入循环进一步尝试tryAcquire()方法来获取锁
-
if (s != null)
-
LockSupport.unpark(s.thread); //释放许可
-
}
转载:https://blog.csdn.net/qq_42107430/article/details/105630706