【Java】AbstractQueuedSynchronizer

阅读 127

2022-11-11


综述:

全称AbstractQueuedSynchronizer,是java.util.concurrent.locks包下的一个类。类的注释写的很详细。
该类为很多并发场景提供了一个基于FIFO等待队列的用于解决阻塞锁或者信号量的通用框架。使用的时候,我们需要写一个子类实现抽象方法,然后使用组合代理模式实现锁或者信号量等具体实现类。

下面看下实现。

1.内部类Node:

既然是队列,肯定需要定义节点Node,这里是双向链表,所以Node里有prev和next两个指针。除此之外,有一个waitStatus变量,用于标识该节点的状态,每一个节点代表一个线程,其实也就是线程的状态,主要有以下几个:

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

只要一个cancel的状态是大于0的,表示该节点已经超时或者被中断,不需要继续等待了。

2.AbstractQueuedSynchronizer:

属性有head和tail,代表队列的头和尾。
为了方便理解,这里假设AQS用在阻塞锁的场景下。

整个AQS的思想是先进行一次CAS的快速尝试,如果失败了再进行常规等待。

关键属性state,在阻塞锁场景下,可以理解为重入次数。不加锁的时候是0,枷锁时需要cas设置为1。
方法

先看获取的:

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

整个逻辑都在if里面了,先调用tryAcquire进行一次快速获取,看能否获取成功,如果不成功就加入队列中等待。

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

在AbstractQueuedSynchronizer里面,tryAcquire方法是抽象的,因为获取逻辑会与锁的具体实现捆绑,比如是否可重入,是否公平等。入队首先调用addWaiter方法新建一个node。

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

addWaiter方法仅仅需要新new一个node,然后链入链表即可。先通过CAS设置tail指针进行快速尝试,如果不行就进入常规等待方式入队,也就是enq方法:

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

enq方法是一个CAS+循环的实现方式,不断尝试直到入队成功。    
至此,代表新线程的节点已经链入队列中。
接着就是调用acquireQueued方法来等待直到有机会获取到锁。

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

那么首先要明白什么时候可以获取锁?
只有head的后继节点有权利尝试获取,注意不是一定获取到。for循环里的第一个if做的就是这个判断。听上去怪怪的?为啥不是head呢?head就是当前持有锁的线程,只有head释放了,head后面的才有机会获取。如果命中了第一个if,说明已经拿到锁了,就把当前节点设置为head。否则,就需要找一个节点等待,也就是shouldParkAfterFailedAcquire方法。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

shouldParkAfterFailedAcquire方法所做的就是从当前节点往前找,知道有一个前驱的状态是小于0的,也就是非cancel的。为什么不是按顺序等待?因为前面的节点可能取消等待了,所以需要向前找。
当找到何时节点以后,就调用parkAndCheckInterrupt方法阻塞当前线程,线程就会卡死在这。

再看释放的:

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

调用tryRelease释放,tryRelease也是抽象的,原因前面说过,和策略有关,可定制。

protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

释放不存在竞争,因为只有一个线程能获取。所以这里的逻辑相对简单。释放成功需要唤醒队列后继节点的线程。

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

主要在后面的代码,从后继节点一直找一个status小于0的节点。然后唤醒该节点所代表的线程。
这个线程就会执行之前acquireQueued方法的for循环,此时它的前驱就是head,那么他就可以获取锁了。最终它会把head设置为自己。

这就是AQS的大致逻辑。

总结如下:

1.使用CAS实现快速获取;
2.如果不能获取到,就进入等待队列,只有head的后继有资格获取。head释放锁会唤醒后继节点,这个过程不断循环下去。

AQS通过FIFO队列实现了一个同步器,将获取过程抽象,将入队出队等过程实现出来,我们用AQS只要实现抽象方法即可,推荐组合+代理的方式。代码的注释还给了例子:

*  <pre> {@code
* class Mutex implements Lock, java.io.Serializable {
*
* // Our internal helper class
* private static class Sync extends AbstractQueuedSynchronizer {
* // Reports whether in locked state
* protected boolean isHeldExclusively() {
* return getState() == 1;
* }
*
* // Acquires the lock if state is zero
* public boolean tryAcquire(int acquires) {
* assert acquires == 1; // Otherwise unused
* if (compareAndSetState(0, 1)) {
* setExclusiveOwnerThread(Thread.currentThread());
* return true;
* }
* return false;
* }
*
* // Releases the lock by setting state to zero
* protected boolean tryRelease(int releases) {
* assert releases == 1; // Otherwise unused
* if (getState() == 0) throw new IllegalMonitorStateException();
* setExclusiveOwnerThread(null);
* setState(0);
* return true;
* }
*
* // Provides a Condition
* Condition newCondition() { return new ConditionObject(); }
*
* // Deserializes properly
* private void readObject(ObjectInputStream s)
* throws IOException, ClassNotFoundException {
* s.defaultReadObject();
* setState(0); // reset to unlocked state
* }
* }
*
* // The sync object does all the hard work. We just forward to it.
* private final Sync sync = new Sync();
*
* public void lock() { sync.acquire(1); }
* public boolean tryLock() { return sync.tryAcquire(1); }
* public void unlock() { sync.release(1); }
* public Condition newCondition() { return sync.newCondition(); }
* public boolean isLocked() { return sync.isHeldExclusively(); }
* public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
* public void lockInterruptibly() throws InterruptedException {
* sync.acquireInterruptibly(1);
* }
* public boolean tryLock(long timeout, TimeUnit unit)
* throws InterruptedException {
* return sync.tryAcquireNanos(1, unit.toNanos(timeout));
* }
* }}</pre>

 

精彩评论(0)

0 0 举报