AQS之排斥锁分析

阅读 123

2022-11-05


文章目录

  • ​​1.AQS 内部体系架构​​
  • ​​2.获取资源​​
  • ​​2.1 获取资源 tryAcquire()e​​
  • ​​2.2 加入队列 addWaiter()​​
  • ​​2.3 排队获取资源 acquireQueued()​​
  • ​​2.4 阻塞检查 shouldParkAfterFailedAcquire()​​
  • ​​3.释放资源​​
  • ​​3.1 唤醒后继节点 unparkSuccessor()​​
  • ​​4.获取&释放资源总流程​​
  • ​​5.其他获取资源的方法​​
  • ​​5.1 响应中断 acquireInterruptibly()​​
  • ​​5.2 超时参数 tryAcquireNanos()​​

1.AQS 内部体系架构

AQS之排斥锁分析_释放资源

  • FairSync: 公平锁
  • NoFairSync: 非公平锁

AQS之排斥锁分析_java_02

  • Shared: 共享模式
  • Exclusive: 排他模式

2.获取资源

AbstractQueueSynchronized.acquire()

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

  1. 尝试获取资源,成功则结束
  2. 获取资源失败,新建节点插入队列
  3. 在队列里排队并获取资源
  4. 如果等待过程中出现线程中断,则中断自身
    (1) 首先就是一次资源获取的尝试,具体尝试逻辑由各实现类实现,失败再加入队列
    (2) 该方法不响应中断,如果在等待过程线程被中断,线程不会响应,只会在等待结束后中断重新自身

2.1 获取资源 tryAcquire()e

FairSync:公平

AQS之排斥锁分析_线程中断_03

NoFairSync: 非公平

AQS之排斥锁分析_抛出异常_04

区别为 hasQueuedPredecessors()方法

public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

这个方法可以使得公平和不公平, 去判断有没有别的线程排在了当前线程的前面。

先是tail, 后是head, 如果这二者只有一个为null(另一个不为null), 只可能为tail = null, head 不为null

AQS之排斥锁分析_线程中断_05

2.2 加入队列 addWaiter()

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

enq()

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

初始化一个排斥模式的节点加入到队列尾部,tail指向该节点。本方法先进行一次快速尝试,失败后在一个循环中自旋尝试。

2.3 排队获取资源 acquireQueued()

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

本方法是核心逻辑,实现了一个节点在队列中排队、阻塞到获取资源成功的一个逻辑,具有返回值,返回排队过程是否线程被中断过。
最后考虑到排队过程中抛出异常的场景,加了一个finally的处理,对这类厂家取消排队获取资源。

2.4 阻塞检查 shouldParkAfterFailedAcquire()

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//唯一符合的情况
if (ws == Node.SIGNAL)
return true;
//不符合怎么办
if (ws > 0) {
//ws>0说明节点无效,调整排队位置,向前找到有效的前驱节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//修改前驱节点状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

  1. 符合阻塞的条件:前驱节点状态为singal
  2. 不符合怎么办:调整队列,找到有效的前驱节点后,修改前驱节点状态

parkAndCheckInterrupt(): 阻塞方法

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

只有被唤醒或者线程中断才能退出阻塞, 最后一句检查并清理中断标志,所以acquire方法中如果被中断过,需要进行一次自我中断。

3.释放资源

AQS之排斥锁分析_线程中断_06

尝试释放资源tryRelease也是交给实现类去实现。

Sync.tryRelease()

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

3.1 唤醒后继节点 unparkSuccessor()

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

  1. 修改当前节点的状态标志
  2. 找到下一个节点,如果存在则唤醒线程

4.获取&释放资源总流程

AQS之排斥锁分析_AQS_07

5.其他获取资源的方法

5.1 响应中断 acquireInterruptibly()

public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

  1. 尝试获取资源时进行一次中断检查
  2. 阻塞时被中断抛出异常

5.2 超时参数 tryAcquireNanos()

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

该方法是响应中断的, 超时中断

  1. 排队获取资源时先判断是否已经超时
  2. 每次判断是否需要阻塞时都先判断是否超时
  3. 阻塞时增加检查,时间不足只进行自旋,减少阻塞的性能消耗
  4. 阻塞时阻塞指定时间


精彩评论(0)

0 0 举报