小言_互联网的博客

JAVA并发编程-5-AQS的实现原理

639人阅读  评论(0)

上一章看这里:JAVA并发编程-4-显式锁Lock

一、LockSupport工具的使用

先来简单讲解下LockSupport的使用

java在LockSupport中定义了一组静态方法,来提供最基本的线程操作,包括阻塞一个线程,唤醒一个线程,构架同步组件等。前面讲到的CountDownLatch、Semaphore信号量中其实都有LockSupport的使用

LockSupport中最重要的两个方法是

  • public static void park()
  • public static void unpark(Thread thread)

park意为停车,即阻塞线程,unpark即为唤醒线程

park和unpark中都调用了UNSAFE中的本地方法实现。

二、初识AQS

1、什么是AQS

AQS即为AbstractQueuedSynchronizer类,一看就知道是一个抽象类,Queue意为队列,Synchronizer是同步器,字面来看AQS就是一个抽象的队列同步器

JUC包下定义AQS是来方便我们实现之前讲的显示锁Lock,CountDownLatch等并发工具的,内部使用了先进先出队列来实现开发,作者是我们之前提到的Doug Lea,他希望AQS能成为大部分同步需求的开发标准和基础

2、模版方法设计模式

我们知道AQS是一个抽象类,必须有类来继承它为我们提供使用。而在AQS中使用了模版方法设计模式来规范子类的实现和使用。 所谓模版方法设计模式就是在父类中定义了需要子类覆盖实现的流程方法,和规定流程方法怎样执行的骨架模版方法,使用的时候只需调用模版方法就可以完成操作。

来看个简单的小例子,假设我们有发送短信、发送邮件、发送消息等需求。这些需求总是要定义发给谁 to(),从哪里发出去的from(),发送的信息content(),发送过程send()等每个需求都不一样的方法,则在父类中定义子类实现自己的业务,也需要date() 公共方法在父类中直接实现。

模版方法sendMessage()调用了每个流程发放,定义了整个发送过程。子类实现这个类后,使用类只需调用sendMessage()即可完成发送流程。

public abstract class SendCustom {
    /**
     * 流程方法
     */
    public abstract void to();

    public abstract void from();

    public abstract void content();

    public void date() {
        System.out.println(new Date());
    }

    public abstract void send();

    //框架方法-模板方法
    public void sendMessage() {
        to();
        from();
        content();
        date();
        send();
    }
}

模版方法模式使用的目的其实就是为了解耦合,定义好每个流程方法用于子类实现,这一点是面向锁的实现者,去实现每一步的加锁方法。模版方法供调用者使用,使面向锁的调用者,屏蔽了使用的锁细节。

3、AQS中的方法

模版方法:

  • 独占式获取锁 acquire acquireInterruptibly(可中断获取) tryAcquireNanos(超时获取)
  • 独占式释放锁 release
  • 共享式获取锁 acquireShared acquireSharedInterruptibly tryAcquireSharedNanos
  • 共享式释放锁 releaseShared

流程方法:

  • 独占式获取 tryAcquire
  • 独占式释放 tryRelease
  • 共享式获取 tryAcquireShared
  • 共享式释放 tryReleaseShared
  • 同步器是否处于独占模式 isHeldExclusively

另外提供了 private volatile int state 来标记同步状态,标记锁的取得和释放

  • getState() 获取当前的同步状态
  • setgetState(int newState) 设置当前同步状态
  • compareAndSetState(int expect, int update) cas原子操作设置状态

4、实现一个独占锁

下面我们先自己尝试实现一个独占锁:
模拟ReentrantLock的实现,SelfLock类本身只实现Lock接口,需要一个内部类来继承AbstractQueuedSynchronizer,实现流程方法isHeldExclusively、tryAcquire、tryRelease。SelfLock中实现Lock接口的方法

public class SelfLock implements Lock {

    //state 表示获取到锁 state=1 获取到了锁,state=0,表示这个锁当前没有线程拿到
    private static class Sync extends AbstractQueuedSynchronizer {

        //是否占用
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        @Override
 

   protected boolean tryAcquire(int arg) {
        //原子操作来改值
        if (compareAndSetState(0, 1)) {
            //表示当前线程拿到了这把锁
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) {
                //说明锁没有被占用,抛出一个异常
                throw new UnsupportedOperationException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sycn = new Sync();

    @Override
    public void lock() {
        sycn.acquire(1);

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sycn.acquireInterruptibly(1);

    }

    @Override
    public boolean tryLock() {
        return sycn.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sycn.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sycn.release(1);

    }

    @Override
    public Condition newCondition() {
        return sycn.newCondition();
    }
}

需要注意的一点,加锁时的方法tryAcquire需要原子操作compareAndSetState,因为可能有多个线程来竞争锁。解锁的方法setState(0)不需要原子操作,因为只有当前线程持有锁。

三、深入AQS的结构和源码

对于锁,假设10个线程同时去获取一把锁,AQS保证只有一个线程拿到了锁,那么其它9个线程在做什么呢,它们又是怎样依次拿到锁的呢,我们来研究下AQS的源码寻找答案!

1、AQS中的数据结构—节点和同步队列


对于没有拿到锁的线程,即tryAcquire()方法返回false的,同步器会将对应线程及其它的一些信息构造成一个节点,加入到一个同步队列里面。
这个同步队列先进先出,保证先进入队列的节点先被唤醒。同时,这个队列是一个双向链表,节点有指针指向前序节点prev和后续节点next。在同步器中还有两个指示器,一个头部指示器head,一个尾部指示器tail,分别指向这个队列的头节点和尾节点。

2、节点Node

Node是AQS中的一个内部类,Node中有几个状态来表明当前存入该节点的线程目前处于一种什么状态。分别是:

       //线程等待超时或者被中断了,需要从队列中移走
       static final int CANCELLED =  1;
       //后续的节点等待状态,当前节点如果完成释放了锁,就通知后面的节点去运行
        static final int SIGNAL    = -1;
        //当前节点处于等待队列
        static final int CONDITION = -2;
        //用在共享中,表示状态要往后面的节点传播
        static final int PROPAGATE = -3;

       //另外还有一个状态0,表示当前节点处于初始状态

这些值会被赋值给属性变量 volatile int waitStatus;
另外的几个属性:

	   //前序节点
       volatile Node prev;
       //后序节点
       volatile Node next;
       //在节点中等待的线程
       volatile Thread thread;
       //用在Condition等待队列中
       Node nextWaiter;

3、节点在同步队列中的增加和移除

什么时候需要打包成节点往同步队列中去增加呢,显而易见,即当前线程去拿锁时没有拿到,需要加入到队列中去等待。


假设1线程持有了锁没有释放,后面来的线程2、3、4就会依次加入到同步队列,构成一个双向链表,同步器通过CAS依次设置尾节点。如果线程1释放了,则会找到2并唤醒它,则2就会成为同步队列的头节点持有锁。再来一个线程5,则加入尾节点。这就是节点在同步队列变化的过程。

4、独占式同步状态获取与释放


流程上大体还是上述过程,拿不到锁就加入同步队列尾部,然后判断前序节点是否为头节点,如果不是就进入等待状态。当前序节点变为头节点时会唤醒后面节点,去尝试获取同步状态,如果前序节点没有释放锁,它又会进入等待状态,不停的自旋获取同步状态,直到获取成功当前节点成为头节点。

获取同步状态的方法是模版方法acquire(int arg)

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

首先调用tryAcquire()方法,这是自己要实现的流程方法,我们在上面是这样实现的

   protected boolean tryAcquire(int arg) {
            //原子操作来改值
            if (compareAndSetState(0, 1)) {
                //表示当前线程拿到了这把锁
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

如果tryAcquire返回了true,则acquire直接退出,否则要执行acquireQueued,这里面调用了addWaiter方法,addWaiter的作用就是把线程打包成Node加入到同步队列中。

    private Node addWaiter(Node mode) {
        //将线程包装成Node
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

分析下这段代码,首先将当前线程包装成节点Node,然后判断当前队列的尾节点,如果不为空,就令Node的前序节点为原来的尾节点tail,再原子操作的去把同步器的尾节点设为node,如果设置成功,把原来的尾节点的next节点设为node后返回。如果尾节点是空的或者设置同步器的尾节点为Node失败,就要执行 enq(node);

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq方法中不断的在自旋。如果tail是空的,说明当前同步队列是空的,new Node()放到队列的头节点再令尾节点等于头节点继续进入下次循环。如果尾节点不为空,令node前序节点为tail,再进行原子操作的设置尾节点,不断循环直到成功。这样最终node节点会被添加到队列尾部。

addWaiter方法最终返回了Node,需要继续执行acquireQueued方法。我们想一下这个方法需要干什么呢?根据上面的流程图,返回节点后当前线程的节点需要在同步队列中进行等待了。

   final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这里面也是在不断的自旋。首先通过predecessor方法拿到前序节点,如果前序节点是头节点,会tryAcquire再次拿锁,如果成功了,把自己设置成头节点, p.next = null把前序节点脱钩(使其不在队列)。如果前序节点不是头节点或者拿锁失败,会执行parkAndCheckInterrupt

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

该方法中调用了 LockSupport.park(this);,将线程自己阻塞了。
这就是线程加入同步队列的过程。

接下来是释放锁的过程,释放锁我们调用的是模版方法release方法

   public void unlock() {
        sycn.release(1);
    }

来看下release的实现:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

release中调用流程方法tryRelease解锁成功后,拿到头节点并且节点等待状态不为0初始状态时,进入 unparkSuccessor(h);

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

在unparkSuccessor中可以看到最终拿到了已解锁节点的后序节点,并且调用LockSupport.unpark唤醒了它。

5、共享式同步状态获取与释放

共享式的获取锁的方法是acquireShared:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

调用流程方法tryAcquireShared,是需要子类去覆盖的方法。当tryAcquireShared返回值小于0,意味着当前线程没有获取到锁,反之则是获取锁。如果没有获取则进入doAcquireShared,共享状态获取锁的过程:

   private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这个方法跟独占式获取锁的方法大同小异,不同的一个点是会调用setHeadAndPropagate方法将获取锁的状态向后面的线程传播,通过Node的waitStatus = PROPAGATE来进行传播。详细代码不再赘述,大家可以自己研究下。

6、独占式超时同步状态获取

同样超时获取的核心方法也和独占式的差不多:

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这里面有经典的等待超时机制,即每被唤醒一次后,判断当前条件如果还不满足,就减去相应时间后重新设置等待时间继续等待。大家可以结合独占式获取锁自己研究下这个代码。

四、Condition分析

在上一章中我们介绍了显示锁的等待通知机制Condition,等待通知机制是要和锁配合使用的,现在我们来看下它的实现。
我们知道,一个Lock可以有多个Condition,通过newCondition创建。在每一个Condition对象中都包含了一个等待队列,类似AQS中的等待队列:

我们发现它跟AQS中的等待队列还是很像的,都是将线程包装成一个Node放到队列中;Condition中都有指向头节点和尾节点的指针。但是,它是一个单向链表

对于一个AQS对象,它有一个同步队列,并且可以有多个等待队列,如图:

一个完整的AQS实例化后就是上面这个样子喽!

当我们调用了Condition的await方法的时候,会将节点从同步队列移动到等待队列中:

当调用Condition的signal方法的时候,就会去唤醒Codition等待队列中中的头节点,唤醒后就需要尝试着去竞争锁,因此会被加入同步队列的尾部。

五、再看Lock的实现

1、锁的可重入

上面我们自己实现的锁是不支持可重入的,同一个线程再次调用tryAcquire方法时,锁已经被占用导致加锁失败。那可重入锁ReentrantLock是怎么实现的呢?

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

从上面代码我们可以看到,在第3行拿到了一个状态,第11行在判断当前线程是已经持有锁的线程后,将这个状态进行累加,再把值同步回去。

同样在释放锁的时候会将对应的值减去对应值,下面代码第2行。

     protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

由此实现了锁的可重入。

2、锁的公平和非公平

公平锁和非公平锁的实现跟上面介绍的原理差不多,只是公平锁的实现中会多加入一个方法:

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

这个方法会判定前面是否有前序节点,如果有前序节点,则一定不会去先尝试拿锁,而是保证会加入到队列的尾部。

而非公平锁不会有这个判定,执行到加锁方法时总是会先尝试去拿一下锁,如果失败再加入同步队列。

由此,可以说非公平锁的效率是要优于公平锁的。因为如果按队列要唤醒下一个线程来拿锁,唤醒线程牵扯到上下文切换所需要的时钟周期并不短,可能在这个时间内刚来的抢先获得锁的线程都已经执行结束了。

3、读写锁ReentrantReadWriteLock的实现

这里非常值得说的一个问题是:一个AQS中只有一个状态来标记锁的当前状态,那一个状态怎么来标记两个锁(读锁和写锁)的状态呢?

我们直到一个int类型的state是有32个字节的,读写锁会将低16位字节用于保存写的状态,高16位字节用于保存读的状态。

还有一点不同的是,由于写锁只能有一个,锁重入时,直接在低16位的值➕锁的次数。而读锁是可以有多个线程持有的,高16位用来保存持有锁的线程个数,而可重入次数是在ThreadLocal中保存。

     static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

总结:本章学习了AQS,相对于之前的工具类的使用是比较偏向于原理的东西,需要老铁们多看几遍,跟着源码更加深入的理解AQS的实现


转载:https://blog.csdn.net/weixin_40292704/article/details/105645002
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场