飞道的博客

高并发之JUC——AQS源码深度分析(一)

325人阅读  评论(0)

不要只满足于目前的安逸,傻傻的在工作中写CRUD了。因为在目前疫情面前,企业依然朝不保夕。我们不能温水煮青蛙,只有时刻保持一颗学习的心,拥有硬核能力,时刻心如止水,才能立于不败之地。

这是高并发JUC的系列的第一篇。AbstractQueuedSynchronizer简称AQS,它是JUC工具类的基石,JUC包下的类都直接或间接的继承了AbstractQueuedSynchronizer,而JUC是jdk自带的实现高并发的辅助工具。所以我们首先分析AQS源码,才能更好的学习JUC并发包中的其他工具类。

AQS数据结构

AQS的核心就是双向链表+LockSupport+CAS,那么很显然AQS是用双向链表来作为其数据结构的:

上面图中就是AQS内部的双向同步队列。那么这个队列的作用是什么呢?就是当线程请求锁时,当没有获取到锁,那么就将当前请求线程加入到这个同步队列中,从而有机会竞争锁。至于请求加入和移除队列后队列中节点的变化,图中文字部分已经说明了。

下面来看下看下AQS源码中是如何定义这个双向链表的:

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        // 标记该等待的节点是共享模式下的节点
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        // 标记该等待的节点是独占模式的节点
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        // 标记线程状态是被取消状态
        static final int CANCELLED =  1;
        
        /** waitStatus value to indicate successor's thread needs unparking */
        // 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
        static final int SIGNAL    = -1;
        
        /** waitStatus value to indicate thread is waiting on condition */
        // 标示该节点是条件队列节点
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        // 标示为传播节点
        static final int PROPAGATE = -3;
        
        /**
         * 节点状态:CANCELLED、SIGNAL、CONDITION、PROPAGATE
         */
        volatile int waitStatus;

		// 下一个节点
        volatile Node next;

		// 节点对应的线程
        volatile Thread thread;

        // 下一个节点(指在条件队列中的)
        Node nextWaiter;

		// 是否是共享模式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

		// 取得前继节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }
		
		/**
		 *  常用的构造函数
		 *  thread:节点所属线程
		 *  mode:节点类型(独占/共享)
		 */
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

上面是AQS的内部类,定义了链表的数据结构。上面的代码中都有注解说明,大家应该都应该能看得懂。

主要是SHARED和EXCLUSIVE两个节点,分别是共享节点和独占节点,独占就是多个线程可以持有同一把锁,共享节点就是只能有一个线程持有这把锁。

还有就是节点的几个状态,需要关注,这个在后续文章中讲解工具类时会分析到。这里先了解就好。下面对AQS中核心方法做详细说明。

AQS获取锁acquire

acquire是AQS的核心方法,指的是获取锁,AQS的子类lock获取锁的时候,会直接调用AQS的acquire这个方法来实现。

原理

锁竞争原理:

上图是锁竞争流程,以下做文字说明:

1、当一个加锁的请求线程过来执行lock,那么没有其他任何线程,那么直接加锁成功;否则进入步骤 2(步骤2是一个自旋操作(挂起当前线程或一直自旋来阻塞当前请求线程),直到获取到锁后返回。)

2、当一个请求过来加锁,当获取锁失败,那么直接将待获取锁的线程加入到等待队列,加到队尾(节点状态为0);然后判断当前待获取锁的节点的前继节点是不是头节点,如果是头节点,说明当前节点的前继(头节点)节点正在持有锁或者是那个占位的空节点,那么此时当前请求加锁的线程就再一次尝试获取锁,如果获取锁成功,那么就将当前线程在等待队列中的节点位置变为头节点。以上是成功获取到锁。不成功则执行步骤3;

3、如果直接获取锁失败,或者当前请求的线程在等待队列中的前继节点不是头节点,那么就执行如下逻辑:
3.1 若前继节点的状态为SIGNAL,则说明前继节点正在等待唤醒,那么直接挂起当前请求线程;
3.2 否则前继节点的状态为>0,那么肯定是为canceled,取消状态。那么就从当前请求线程的节点开始向前递归遍历,直到找到节点状态不为canceled的节点 ,然后将当前请求的节点作为这个节点的后继节点(也就是说忽略了被canceled的请求节点,那些被canceled的节点也就等待被垃圾回收)。否则如果当前节点的前继节点不是SIGNAL或>0,那么将前继节点状态置为SIGNAL,然后将当前请求线程挂起,等待其他线程释放锁,唤醒当前挂起的线程。

源码

acquire

public final void acquire(int arg) {
    // tryAcquire由子类实现;addWaiter添加到队列;acquireQueued排队
    // tryAcquire尝试获取锁,这个由子类实现。当获取锁失败后,执行acquireQueued方法。
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquireQueued

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自旋。
            for (;;) {
                // 判断当前线程节点是不是第一个等待线程,如果是,并且获取锁成功,将当前线程的节点置为head(其实头节点就是一个占位的空节点)。
                // 如果上述操作没有成功或不满足是第一个等待线程,那么就会执行shouldParkAfterFailedAcquire方法。
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 根据前继节点状态来判断当前节点对应的线程是否被阻塞挂起,如果需要挂起,那么将当前线程阻塞挂起。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // ws是当前等待线程对应的节点的前面一个节点的状态。
        int ws = pred.waitStatus;

        // ws = SIGNAL,说明node对应的线程需要挂起并等待唤醒,所以将当前node的线程挂起,等待唤醒。
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        // ws>0为取消状态,也就是这个节点对于的线程已经cancel,所以也就不需要获取锁了。
        // 那么此时就会将当前node的prev指向上上个节点,这是一个do while操作,也就是判断当前node的前面的所有节点是不是都取消了,
        // 如果取消了,那么当前node就在队列中向前移动。
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 前继节点既不是被取消,也不是SIGNAL,当前node加到等待队列后,将当前节点的前面一个节点的状态置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() {
        // 挂起当前线程
        LockSupport.park(this);
        // 返回当前线程打断状态(是否被打断)
        return Thread.interrupted();
    }

AQS释放锁release

原理

当线程不需要同步锁时,需要释放锁,让其他线程有机会竞争到锁。释放锁原理:

上图是释放锁流程,下面进行文字说明:

1、 执行tryRelease释放锁。当释放锁失败则返回false。否则返回true执行步骤2;
2、当释放锁成功后,取得头节点(此时头节点可能是当前释放的线程),然后判断头节点是否为空并且状态是否不等于0(存在等待获取锁的请求线程),如果等待队列中存在节点,那么获取状态不等与0的第一个后继节点,然后将其解阻塞,这样就回到了acquire中的步骤3,挂起的线程将会被打断,继续向下执行,当然从步骤3会执行回步骤2继续执行,直到成功获取到锁。

源码

release

public final boolean release(int arg) {
    // 释放锁,此方法由具体子类实现。如果释放锁成功,那么执行if里面的逻辑。
    if (tryRelease(arg)) {
        Node h = head;
        // 释放锁成功,判断队列中头节点是不是状态不为0(状态为0是初始化状态)
        // 如果头节点存在,并且不是初始化状态,那么解阻塞
        if (h != null && h.waitStatus != 0)
            // 将后继节点解阻塞。
            unparkSuccessor(h);
        return true;
    }
    return false;
}

unparkSuccessor

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    // 将当前传入的节点状态设置为0,这里也就是要释放锁的节点。
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 取得当前队列中的第一个等待节点(并且状态<=0,就是可以竞争锁的状态)
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 解阻塞,即可以去竞争锁
        LockSupport.unpark(s.thread);
}

以上是独占锁的锁竞争和释放锁的源码分析,到这里相信大家已经基本了解了其处理机制。不过不懂也没关系,读源码是一个享受孤独的过程,需要静下心来,一点点读,不能急于求成。多读多写,慢慢就会有感觉了。

下一篇文章将继续分析AbstractQueuedSynchronizer中关于共享锁的竞争锁和释放锁机制及带超时时间的共享锁、中断共享锁的加锁及释放锁的原理分析。

最后

如果本篇对你有用,欢迎点赞、关注、转载,由于水平有限,如有问题请留言。如果大家对我的技术文章分享感兴趣,我可以在钉钉直播或其他渠道和给大家分享关于源码解析及各种技术应用的讲解。


转载:https://blog.csdn.net/qq_20878967/article/details/106163790
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场