0
点赞
收藏
分享

微信扫一扫

精通Java并发编程N:深入理解AQS的实现原理、源码分析


文章目录

  • ​​一、AQS原理​​
  • ​​1、AQS是什么?​​
  • ​​2、AQS的核心:CLH队列​​
  • ​​3、以独占锁同步状态的获取和释放为例:​​
  • ​​4、从具体的代码实现来看​​
  • ​​1)互斥锁​​
  • ​​**<1>** `acquire()`获取互斥锁​​
  • ​​**<2>** `release()`释放互斥锁​​
  • ​​2)共享锁​​
  • ​​<3> `acquireShared()`获取共享锁​​
  • ​​<4> `releaseShared()`释放共享锁​​
  • ​​5、面试题2:AQS中线程Node节点的PROPAGATE状态是什么意思?​​
  • ​​二、AQS源码分析​​
  • ​​1、acquire()获取互斥锁​​
  • ​​1> addWaiter()方法​​
  • ​​1)面试题3:为什么CAS可以保证线程安全?​​
  • ​​2)面试题4:是否存在通过head引用遍历不到最后一个节点的情况?​​
  • ​​2> acquireQueued()方法​​
  • ​​1)面试题5:为什么添加到阻塞队列后,要先尝试获取锁?​​
  • ​​1>> shouldParkAfterFailedAcquire()方法​​
  • ​​2>> parkAndCheckInterrupt()方法​​
  • ​​2、release()释放互斥锁​​
  • ​​1> unparkSuccessor()方法​​
  • ​​3、acquireShared()获取共享锁​​
  • ​​1> doAcquireShared()方法​​
  • ​​1>> setHeadAndPropagate()方法​​
  • ​​4、 releaseShared()释放共享锁​​
  • ​​1> doReleaseShared()方法​​
  • ​​5、 acquireInterruptibly()以响应中断的方式获取互斥锁​​
  • ​​总结​​

一、AQS原理

1、AQS是什么?

它是用来构建锁或者其他同步组件的基础框架。

从它的名拆开来看:

  1. Abstract:因为它不知道怎么上锁,所以采用​​模板方法​​暴露出上锁的逻辑。

    • AQS的主要使用方式是​​继承​​,子类通过继承AQS同步器并实现它的抽象方法来管理同步状态。

      2、AQS的核心:CLH队列

      AQS里面的CLH队列是CLH同步锁的一种变形。AQS依赖内部的这个同步队列(FIFO)来完成同步状态的管理。

      精通Java并发编程N:深入理解AQS的实现原理、源码分析_java

      AQS的核心就是用CAS去操作这个同步队列的head和tail,也就是用CAS操作代替了锁整条双向链表的操作,所以它效率高。

      • 1> 当线程获取同步状态失败时,AQS会将< 当前线程、等待状态等信息>构建成一个​​Node​​节点通过​​addWaiter()​​方法加入到阻塞队列的尾部,同时会阻塞当前线程。
      • Node节点的内容:获取同步状态失败的线程引用、等待状态、前驱和后继节点等。
      • AQS的​​addWaiter()​​方法是基于CAS的设置尾节点,其中:
      • compareAndSetTail(Node expext, Node update),保证节点入队的线程安全性。
      • 最后会在enq(fianl Node node)方法中,通过“死循环”来保证节点的正确添加。
      • 2> 当同步状态释放时,会把首节点的第一个活跃的后继节点线程唤醒,使其再次尝试获取同步状态。
      • 唤醒操作只需要将首节点设置成为原首节点的后续节点,断开原节点的next引用即可。

      3、以独占锁同步状态的获取和释放为例:

      精通Java并发编程N:深入理解AQS的实现原理、源码分析_互斥锁_02

      精通Java并发编程N:深入理解AQS的实现原理、源码分析_共享锁_03

      • 在使用​​tryAcquire()​​方法获取同步状态时,AQS维护了一个同步阻塞队列,获取同步状态失败的线程都会通过CAS被加入到队列中并在队列中进行自旋。
      • 线程移出队列(停止自旋)的前提是前驱节点是头节点且成功获取了同步状态。
      • 释放同步状态时,AQS调用​​tryRelease(int arg)​​方法释放同步状态,然后唤醒头节点的后继节点。
      • 注意:维护同步队列的FIFO原则,只有前驱节点是头结点的节点才能获取同步状态。

      4、从具体的代码实现来看

      此处做一个总述,具体细节见后面的源码分析

      1)互斥锁

      <1> acquire()获取互斥锁
      • acquire()方法中有一个模板方法​​tryAcquire()​​,用于给子类实现调用,判断是否获取锁成功。如果获取失败:
      1. 第一步先调用​​addWaiter()​​方法,通过CAS把当前线程节点封装成​​Node​​阻塞节点,放到AQS的阻塞队列中;
      • ​addWaiter()​​方法是基于CAS的设置尾节点,其中:
      • compareAndSetTail(Node expext, Node update),保证节点入队的线程安全性。
      • 最后会在enq(fianl Node node)方法中,通过“死循环”来保证节点的正确添加。
      1. 第二步,放到AQS队列中后,调用​​acquireQueued()​​方法尝试去获取一下锁,如果获取锁成功,则把当前线程节点设置为head节点。
      • 如果获取锁失败,则调用​​shouldParkAfterFailedAcquire()​​方法,判断当前线程节点是否可以阻塞;
      • 即从当前线程节点一直往前找节点、直到找到一个SIGNAL的活跃节点,并把当前线程节点作为它的后继节点。
      1. 最后以响应中断的方式阻塞当前线程节点。
      <2> release()释放互斥锁
      • release()方法中有一个模板方法​​tryRelease()​​,用于给子类实现调用,判断是否释放锁成功,如果失败直接返回FALSE,否者:
      • 调用​​unparkSuccessor()​​方法唤醒head节点的有效后继节点,即:从head节点开始往后找到第一个有效后继节点,唤醒。
      • 不过,找后继节点时,如果head的第一个后继节点是个无效节点,需要从​​tail​​节点开始往前遍历找到第一有效的后继节点。
      • 因为我们往阻塞队列中添加线程节点时,首先更新的是tail引用,然后将旧的tail.next指向新tail,所以可能有一瞬间遍历不到新的tail节点。

      2)共享锁

      <3> acquireShared()获取共享锁
      • acquireShared()方法中有一个模板方法​​tryAcquireShared()​​,用于给子类实现调用,判断是否获取锁成功,如果获取成功直接返回,否者:
      1. 调用​​doAcquireShared()​​方法阻塞当前线程节点,其中包括:
      • 第一步:和​​tryAcquire()​​方法一样的调用​​addWaiter()​​方法,通过CAS把当前线程节点封装成​​Node​​阻塞节点,放到AQS的阻塞队列中;只是这里Node节点的类型是SHARED。
      • 第二步和​​tryAcquire()​​方法类似,尝试获取共享锁;
      • 首先判断当前Node节点的前驱节点是不是头结点,如果是:
      • 调用子类实现的​​tryAcquireShared()​​​方法尝试获取锁,如果获取锁成功,则将当前Node节点设置为head节点,并且调用​​doReleaseShared()​​方法唤醒后继的SHARED类型的共享Node节点。
      • 第三步:继续往下走,如果线程获取锁失败,则采用响应中断的方式使用LockSupport.park(this)方法阻塞当前线程。
      • 最后,如果在尝试获取锁的过程中失败,则调用​​cancelAcquire()​​方法取消当前线程尝试获取共享锁的操作。将节点从竞争队列中移除,节点状态设置为CANCELLED、引用的线程对象置为null。
      <4> releaseShared()释放共享锁
      • releaseShared()方法中有一个模板方法​​tryReleaseShared()​​,用于给子类实现调用,判断是否释放锁成功,如果失败直接返回FALSE,否者:
      • 调用​​doReleaseShared()​​方法唤醒head节点的有效后继节点,即:从head节点开始往后找到第一个有效后继节点,唤醒。
      • 不过,找后继节点时,如果head的第一个后继节点是个无效节点,需要从​​tail​​节点开始往前遍历找到第一有效的后继节点。
      • 因为我们往阻塞队列中添加线程节点时,首先更新的是tail引用,然后将旧的tail.next指向新tail,所以可能有一瞬间遍历不到新的tail节点。

      5、面试题2:AQS中线程Node节点的PROPAGATE状态是什么意思?

      在共享锁中,线程调用​​doReleaseShared()​​方法执行唤醒时,方法的代码段不是互斥的,所以为了保证线程的正确唤醒,也就有了这么一个中间状态。

      以一个场景为例:Semaphore有两个许可,A线程和B线程都获取到了许可,C线程和D线程在等着。

      • 假如A和B同时执行、释放锁,A在​​doReleaseShared()​​中CAS成功,将head节点B状态设置为0,并通过​​unparkSuccessor()​​方法去唤醒C。
      • C被唤醒之后,它去抢锁,并且抢到锁了,在​​doAcquireShared()​​中有可能执行到了​​setHeadAndPropagate()​​方法去​​设置头结点为自己​​,也有​​可能没执行到​​,即可能将head节点从B更新到了C,可能还是B。这时我们再去​​看线程B释放锁​​的情况。
      • 此时线程B他获取到的​​head节点就有两种情况​​:

      二、AQS源码分析

      1、acquire()获取互斥锁

      public final void acquire(int arg) {
      if (!tryAcquire(arg) && // 子类中判断获取锁失败返回false,这里取反则为true
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // addWaiter()方法为:获取锁失败后,加入到阻塞队列。
      // 进入阻塞阻塞队列之后,考虑线程是否睡眠?
      selfInterrupt();
      }

      // 子类实现获取锁的逻辑, AQS并不知道你用这个state来上锁
      protected boolean tryAcquire(int arg) {
      throw new UnsupportedOperationException();
      }

      1> addWaiter()方法

      获取互斥锁失败后,首先调用​​addWaiter()​​​方法,通过CAS把当前线程节点封装成​​Node​​阻塞节点,放到AQS的阻塞队列中;

      addWaiter()方法中基于CAS的设置尾节点,将阻塞线程节点放入阻塞队列;

      保证节点入队的线程安全性;

      并采用全路径 + ​​优化前置​​(把enq()方法中for循环里的一些判断前置到addWaiter()方法)的技巧,实现快速入队。

      private Node addWaiter(Node mode) {
      // 创建等待节点:当前线程对象 + 锁模式(互斥锁?共享锁?)
      Node node = new Node(Thread.currentThread(), mode);
      // 快速加入到队列
      Node pred = tail;
      if (pred != null) {
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
      //(面试重点,最难理解的地方)特别注意:当上面的CAS成功之后,有一瞬间pred.next并没有关联。这样会导致什么问题?
      // 那一瞬间,我们通过head引用遍历的时候,是到达不了最后一个节点的,A(heade) -> B(旧tail) <- C(新tail)
      // 如何获取到最新的节点? 直接通过tail指针往前遍历即可。
      pred.next = node;
      return node;
      }
      }
      // "死循环"确保节点正确添加
      enq(node);
      return node;
      }

      // 全路径入队方法
      private Node enq(final Node node) {
      for (;;) {
      Node t = tail;
      if (t == null) { // 懒加载时,tail 和head分别代表aqs的头尾节点
      // 通过CAS实现原子初始化操作;当一个空节点实现初始化,
      if (compareAndSetHead(new Node()))
      tail = head;
      } else {
      node.prev = t;
      // CAS更新tail节点,期望是t,想改成node
      if (compareAndSetTail(t, node)) {
      t.next = node;
      return t;
      }
      }
      }
      }

      1)面试题3:为什么CAS可以保证线程安全?
      1. 正常将一个节点添加到队列尾部需要三个操作:
      2)面试题4:是否存在通过head引用遍历不到最后一个节点的情况?

      存在,因为:

      在addWaiter()方法将newNode节点设置为新tail节点之后,才会将旧tail节点的next引用指向newNode。

      • 如果在设置旧tail.next = newNode之前,我们通过head引用遍历,这一瞬间是遍历不到最后一个节点的。

      ​如何获取到最新的节点? ​

      • 直接通过tail指针往前遍历即可。

      2> acquireQueued()方法

      获取互斥锁失败后,其次调用​​acquireQueued()​​方法尝试再获取一次锁,进而决定是否将当前线程进行阻塞。

      // node节点为添加到阻塞队列中的线程节点
      @ReservedStackAccess
      final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
      boolean interrupted = false;
      for (;;) {
      // 获取当前线程节点的前驱节点
      final Node p = node.predecessor();
      // 如果前驱节点是头结点(持有锁的线程节点),这是有可能头结点释放了锁,所以尝试tryAcquire()让子类抢一下锁
      if (p == head && tryAcquire(arg)) {
      // 获取锁成功,更新头结点;释放原头结点的next引用,帮助GC
      setHead(node);
      p.next = null; // help GC
      failed = false;
      return interrupted;
      }
      // 如果前驱节点不是头结点 或 抢锁失败,需要先判断线程是否应该被阻塞? 因为有可能上一瞬间抢锁失败,但是到这的时候头结点已经释放了锁。
      // 如何阻塞呢? 调用parkAndCheckInterrupt()方法来阻塞当前线程。
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
      }
      } finally {
      // 假如上面出现了任何异常,或者 没有获取到锁;取消获取锁操作
      if (failed)
      cancelAcquire(node);
      }
      }

      1)面试题5:为什么添加到阻塞队列后,要先尝试获取锁?

      因为如果添加到阻塞队列后,当前锁状态是无锁的话,就不会有活跃线程获取到锁。

      1>> shouldParkAfterFailedAcquire()方法

      如果前驱节点不是头结点 或 抢锁失败,需要先调用​​shouldParkAfterFailedAcquire()​​方法判断线程是否应该被阻塞?

      • 也就是找到一个可靠的(活着有效的)节点,然后将当前线程节点作为其后继节点即可。

      // pred为前驱节点,node为当前线程节点
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      // 获取前驱节点的状态
      int ws = pred.waitStatus;
      // 如果前驱节点是SIGNAL,此时当前线程节点可以安全的阻塞
      // 为什么可以安全阻塞? 因为SIGNAL状态,代表了上一个线程是活的,它可以通知当前线程节点,所以当前节点是可以安全的阻塞了。
      if (ws == Node.SIGNAL)
      return true;
      if (ws > 0) {
      // 前驱节点是CALLCEL无效节点时,怎么办? 一直往前找,直到找到有效节点。
      do {
      // 取前驱节点的前驱节点,赋值给当前线程节点的前驱节点
      node.prev = pred = pred.prev;
      } while (pred.waitStatus > 0);
      // 更新 新的前驱节点的next指针指向
      pred.next = node;
      } else {
      // 前驱节点是个正常节点时,使用CAS将其变为SIGNAL。
      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
      }
      return false;
      }

      2>> parkAndCheckInterrupt()方法

      如果前驱节点不是头结点 或 抢锁失败,并且可以阻塞当前线程,则调用​​parkAndCheckInterrupt()​​方法以响应式中断的方法阻塞当前线程。

      private final boolean parkAndCheckInterrupt() {
      LockSupport.park(this);
      // 判断是否是通过中断的方式来唤醒。 包括:1、unpark 2、interrupt
      return Thread.interrupted();
      }

      2、release()释放互斥锁

      public final boolean release(int arg) {
      // 子类尝试释放锁
      if (tryRelease(arg)) {
      Node h = head;
      if (h != null //
      && h.waitStatus != 0) // 那就是h的状态为SINGAL
      // 唤醒阻塞队列中的后继线程
      unparkSuccessor(h);
      return true;
      }
      return false;
      }

      // 子类实现释放锁的逻辑,AQS并不知道。
      protected boolean tryRelease(int arg) {
      throw new UnsupportedOperationException();
      }

      1> unparkSuccessor()方法

      该方法用于释放node节点后的有效后继节点,即:从node节点开始往后找到有效后继节点,唤醒。

      找后继节点时,如果后继节点是个无效节点,需要从tail开始往前遍历找到第一有效的后继节点。

      因为因为我们往阻塞队列中添加线程节点时,首先更新的是tail引用,然后将旧的tail.next指向新tail,所以可能有一瞬间遍历不到新的tail节点。

      private void unparkSuccessor(Node node) {
      // 获取当前节点(head)的状态
      int ws = node.waitStatus;
      // ws的状态为SIGNAL
      if (ws < 0)
      // CAS将其变为0,代表了我当前已经响应了这一次的唤醒操作
      compareAndSetWaitStatus(node, ws, 0);

      // 取当前节点的后继节点,作为唤醒节点;但是注意条件
      Node s = node.next;
      if (s == null // 为什么后继节点可能为nul?因为我们往阻塞队列中添加线程节点时,首先更新的是tail引用,然后将旧的tail.next指向新tail,所以有可能一瞬间为空。
      || s.waitStatus > 0) { // 后继节点是个无效节点!
      // 将后继节点设置为空
      s = null;
      // 因为tail引用一定是最新的引用,所以从后往前找,一定能找到 第一个(node节点的第一个后继) 有效节点
      for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
      // 更新后继节点
      s = t;
      }
      // 此时的s就是要唤醒的节点
      if (s != null)
      // 唤醒后继节点
      LockSupport.unpark(s.thread);
      }

      3、acquireShared()获取共享锁

      public final void acquireShared(int arg) {
      // 由子类判断是否获取锁成功,若不成功?则该阻塞阻塞去,<0表示获取锁失败
      if (tryAcquireShared(arg) < 0)
      // 阻塞当前线程节点
      doAcquireShared(arg);
      }

      // 由子类实现获取锁的逻辑,返回值表示还剩几个可以获取的锁资源
      protected int tryAcquireShared(int arg) {
      throw new UnsupportedOperationException();
      }

      1> doAcquireShared()方法

      获取共享锁失败后,首先调用​​doAcquireShared()​​方法,阻塞当前线程节点;其中包括:

      1. 和​​tryAcquire()​​​方法一样的调用​​addWaiter()​​​方法,通过CAS把当前线程节点封装成​​Node​​阻塞节点,放到AQS的阻塞队列中;只是这里Node节点的类型是SHARED。
      2. 和​​tryAcquire()​​方法类似,尝试获取共享锁;
      3. 继续往下走,如果线程获取锁失败,则采用响应中断的方式使用LockSupport.park(this)方法阻塞当前线程。
      4. 最后,如果在尝试获取锁的过程中失败,则调用​​cancelAcquire()​​方法取消当前线程尝试获取共享锁的操作。

      private void doAcquireShared(int arg) {
      // 和互斥锁一样,只是这里的是共享模式
      final Node node = addWaiter(Node.SHARED);
      boolean failed = true;
      try {
      boolean interrupted = false;
      for (;;) {
      // 前驱节点是head节点时,可能头结点很快就释放锁了,尝试获取锁
      final Node p = node.predecessor();
      if (p == head) {
      int r = tryAcquireShared(arg);
      // 获取锁成功;尝试唤醒后面的共享节点,因为共享锁是可以多线程共享的
      if (r >= 0) {
      // 将当前节点更新为head节点,然后唤醒后继节点
      setHeadAndPropagate(node, r);
      // p.next指针已经没有用了
      p.next = null; // help GC
      // 如果在等待过程中发生了中断,由于中断被唤醒,则会置位当前线程的中断标志位。
      if (interrupted)
      selfInterrupt();
      failed = false;
      return;
      }
      }
      if (shouldParkAfterFailedAcquire(p, node) && // 判断是否应该阻塞获取锁失败的线程节点?因为有可能上一瞬间抢锁失败,但是到这的时候头结点已经释放了锁。
      parkAndCheckInterrupt()) // 以响应中断的方式阻塞当前线程节点
      interrupted = true;
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }

      1>> setHeadAndPropagate()方法

      获取共享锁成功后,会调用​​setHeadAndPropagate()​​方法将当前节点更新为head节点,并唤醒后继节点;

      private void setHeadAndPropagate(Node node, int propagate) {
      // 保存旧head结点的快照
      Node h = head;
      setHead(node);
      if (propagate > 0 // 信号量有多余,则直接唤醒
      || h == null // 不可能发生
      || h.waitStatus < 0 // SIGNAL,表明必须唤醒后继节点,则直接唤醒
      || (h = head) == null // 不可能发生,但是将最新的head节点引用赋给h是有意义的
      || h.waitStatus < 0) { // SIGNAL,表明必须唤醒后继节点,则直接唤醒
      // 最新head节点的后继节点
      Node s = node.next;
      if (s == null || s.isShared())
      doReleaseShared();
      }
      }

      在调用​​setHeadAndPropagate()​​​方法将当前节点更新为head节点,并唤醒后继节点时,如果当前节点没有后继节点了,则调用​​doReleaseShared()​​方法释放共享锁;

      ​doReleaseShared()​​​方法在下面的​​releaseShared()​​方法中解析

      4、 releaseShared()释放共享锁

      public final boolean releaseShared(int arg) {
      // tryReleaseShared方法由子类实现
      if (tryReleaseShared(arg)) {
      doReleaseShared();
      return true;
      }
      return false;
      }

      1> doReleaseShared()方法

      private void doReleaseShared() {
      //
      for (;;) {
      // 保存head节点快照
      Node h = head;
      // 头结点不为null 并且头节点不等于尾节点,说明链表中还有等待节点
      if (h != null && h != tail) {
      // 理应要唤醒后面的节点,因为SIGNAL的语义就是必须要唤醒后面的节点
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {
      // CAS将线程节点状态由SINGNAL改为0,表示唤醒成功,如果唤醒失败呢?
      // 唤醒失败,表明多个线程想同时释放共享锁,都需要唤醒后继节点,此时state的状态为2 表示可用资源数为2,需要唤醒后面两个等待的共享节点;失败就重试
      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
      continue; // loop to recheck cases
      // 唤醒后继节点
      unparkSuccessor(h);
      }
      else if (ws == 0 &&
      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // Node.PROPAGATE = -3,如CAS成功,表明线程A释放了一个共享锁唤醒了线程C,且线程C还没有将head节点更新到C上,B线程通过`doReleaseShared()`又释放了一个共享锁,释放锁的过程中发现head节点B的状态是0,这时就会将head节点B的状态设置为PROPAGATE状态。
      // 假如没有这个else if片段,B就不会唤醒D。感觉我有两个信号量,然而只有一个C运行了,D木得了。
      continue; // loop on failed CAS
      }
      // 头结点没有被改变过,且h != null && h != tail 成功,退出循环
      if (h == head)
      break;
      }
      }

      5、 acquireInterruptibly()以响应中断的方式获取互斥锁

      public final void acquireInterruptibly(int arg)
      throws InterruptedException {
      // 就比正常的多这一步,如果线程被中断了,直接抛出异常返回
      if (Thread.interrupted())
      throw new InterruptedException();
      if (!tryAcquire(arg))
      doAcquireInterruptibly(arg);
      }

      总结

      子类只需要实现自己的获取锁逻辑和释放锁逻辑即可,至于排队阻塞等待、唤醒机制均由AQS完成。


举报

相关推荐

0 条评论