AQS简介
Java中Juc包如图所示,基于volatile和CAS搭建起Juc包,其中AQS是其中很重要的抽象框架,因为上层的许多同步工具类都是基于AQS实现的,如ReetrantLock、CountDownLatch、ReadWriteLock、Condition等等,如果在Java内置的同步工具类不能满足你的开发需求,你还可以根据业务场景自定义同步工具类。AQS是同步框架的抽象,维护着共享资源state的访问,并内置一个同步等待队列,在资源不足时,使线程进入队列等待。AQS中封装了出入队的细节,而上层代码只需要定义资源的获取和释放方式
四个重要的方法
根据资源的是否可共享性,可以有选择性地实现独占的tryAcquire、tryRelease,或者实现共享的tryAcquireShared、tryReleaseShared方法。甚至ReadWriteLock实现了两种资源占用方式。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
ReetrantLock源码
之所以通过ReetrantLock来切入AQS,是因为AQS是一个比较抽象的框架,而ReetrantLock的lock()和unlock()方法是我们经常调用的,通过ReetrantLock能够更加容易理解AQS。
lock()
lock()方法利用CAS将state从0置为1,CAS原子操作,保证多线程环境下只有一个线程可以执行成功,然后该线程调用setExclusiveOwnerThread设置独占线程,这是为了线程的可重入而设置的。而其他线程没有设置成功,就会进入aquire方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
aquire()
aquire()方法是AQS中定义的方法,用final修饰,不可重写。这个方法先是尝试获取资源,成功的话就返回。否则调用addWaiter初始化Node,并设置为独占模式并入队,acquireQueued方法则是将当前线程结点的前继结点状态修改为SIGNAL,表明当前继结点获取到资源时,会通知后继者,将其唤醒。同时acquireQueued会返回是否中断的标识。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAquire
如果看AQS的源码,我们会发现这个方法直接抛出异常,是因为这个方法是留给子类来实现的。那么它为什么不直接定义为抽象方法呢?是因为如果定义为抽象方法,那么独占和共享模式都必须重写tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared等方法。对于只需要实现一种模式的子类来说没有必要。我们来看看ReetrantLock中非公平锁的tryAquire方法。
通过getState方法获取资源,如果c为0,说明资源可获取,就用CAS获取,并设置独占线程,返回成功true。如果没有资源了,判断当前线程是否是资源的独占者,如果是,就把state更新。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter
这个方法,目的就是把获取不到资源的线程进入到同步队列进行等待。如果当前在队列已经初始化过了,直接用CAS更新tail结点,更新指针。要不然调用enq方法入队,enq方法其实就是初始化头结点,再把当前节点插入。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued
入队之后,AQS在调用park阻塞自身之前还先看前继是不是头结点,如果前继是头结点,说明如果资源释放了,就先轮到head节点,于是就调用tryAcquire尝试获取,如果获取成功就把head节点释放,当前节点设置为head节点,然后返回。如果前继不是头结点,获取头结点没有获取到资源,就调用shouldParkAfterFailedAcquire方法。shouldParkAfterFailedAcquire方法主要是检查前继结点状态是否为SIGNAL,如果不是,就用CAS设置为SIGNAL。这样本线程结点就可以调用parkAndCheckInterrupt中的LockSupport.park(this);将自己阻塞,注意这里的阻塞跟synchronized不同,park调用会使线程变为WAITING状态,等待唤醒unpark由于是在for循环中调用的,因此这个方法会使CPU一直处于空忙。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
unlock()
unlock释放锁
public void unlock() {
sync.release(1);
}
release
release是AQS定义的释放独占锁的方式,由于锁是独占的,那么释放独占锁也不存在线程竞争,调用子类定义的tryRelease释放资源,然后获取头结点,将头结点的后继结点唤醒。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}