0
点赞
收藏
分享

微信扫一扫

AQS 和 ReetrantLcok 特征和使用介绍


AbstractQueuedSynchronizer 简介

AbstractQueuedSynchronizer 为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁定和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。子类必须定义更改此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。假定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制。但只是为了获得同步而只追踪使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法来操作以原子方式更新的 int 值。

AQS 的特征:

  1. 阻塞等待队列
  2. 共享/独占
  3. 公平/非公平
  4. 可重入
  5. 允许中断

核心属性 int sate

  1. state 表示共享属性被 volatile 修饰

AQS 和 ReetrantLcok 特征和使用介绍_公平锁

三个核心方法:

  1. getState()
  2. setState()
  3. compareAndSetState()

AQS 和 ReetrantLcok 特征和使用介绍_自定义_02

两种资源共享方式:

  • Exclusive-独占,只有一个线程可以访问,如 ReetrantLock
  • Share 共享,多个线程可以同时执行,如:Semaphore/CountDownLatch

AQS 定义两种队列

  • 同步等待队列:主要是用于维护获取互斥失败时入队的线程
  • 条件等待队列:调用 await() 的时候会释放锁,点燃后线程会加入到套件队列,调用 signal() 唤醒的时候把条件队列的节点移动到同步队列中,等待再次获取锁。

AQS 队列节点中的 5 种状态

  1. 值为 0 表示初始化状态,表示当前节点在 sync 队列中,等待获取锁。
  2. CANCELLED , 值为1 , 表示当前的线程被取消;
  3. SIGNAL,值为 -1,表示当前的线程被取消;
  4. CONDITION,值为 -2,表示当前节点的后继节点的线程需要运行,也就是 unpark;
  5. PROPAGAGTE 值为-3,表示当前场景下后续的 acquireShard 能够继续执行。

不同的自定义同步器竞争共享资源的方式也不同,自定义同步器在实现时自需要实现共享资源 state 的获取与实现方式即可,至于具体线程等待队列的维护(如获取资源失败入队、出队等),AQS 已经实现好了,自定义同步器时主要实现一下几个方法(AQS 其实是一个典型的模板方法模式的运用):

  • isHeldExclusively() 该线程是否正在独占资源。只有使用到 condition 才需要去实现。
  • tryAcquire(int):独占方式。尝试获取资源,成功返回 true,失败返回 false。

AQS 和 ReetrantLcok 特征和使用介绍_后端_03

  • tryRelease(int):独占方式。尝试释放资源,成功返回 true,失败返回 false。
  • tryAcquireShared(int): 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int): 共享方式。尝试释放资源,如果释放后允许唤醒后续等待节点返回 true , 否则返回 false。

自定义独占锁

public class Liu666Lock extends AbstractQueuedSynchronizer {

@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() {
acquire(1);
}

public void unlock() {
release(1);
}
}

测试一下: AQS 和 ReetrantLcok 特征和使用介绍_后端_04 输出结果如下: AQS 和 ReetrantLcok 特征和使用介绍_等待队列_05多执行几次我们可以观察,虽然执行的顺序不一定有序,但是我们最终结果是始终 idx = 10

同步等待

AQS 当中的同步等待队列也称为 CLH 队列,CLH队列是 Craig、Landin、Hagersten 三人发明的一种基于双向链表数据结构的队列,是 FIFO 先进先出等待队列,Java 的 CLH 队列是原自 CLH 队列的一个变种实现,线程由原自旋机制改为阻塞机制。

AQS 依赖 CLH 同步队列来完成同步状态的管理:

  • 当前线程如果获取同步状态失败时,AQS 则会将当前线程已经等待状态信息构造成一个节点(Node)并将其加入到 CLH 同步队列,同时会阻塞当前线程
  • 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
  • 通过 signal 或者 signalAll 将条件队列中的节点转移到同步队列。(由条件队列转换为同步队列)

AQS 和 ReetrantLcok 特征和使用介绍_等待队列_06

条件等待队列

AQS 中条件队列是使用单向链表保存的,用 nextWaiter 属性来连接

  • 调用 await 方法阻塞线程;
  • 当前线程存储同步队列头节点,调用 await 方法进行阻塞(从同步队列转换到条件队列)

Condition 接口

AQS 和 ReetrantLcok 特征和使用介绍_等待队列_07

  1. 调用 Condition#await 方法会释放当前持有的锁,然后阻塞当前线程,同时像 Condition 队列尾部添加一个节点,所以调用 Condition#await 方法的时候必须持有锁。
  2. 调用 Condition#signal 方法会将 Condition 队列的首节点移动到队列尾部,然后唤醒调用 Condition#awite 方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁了),所以调用 Condition#signal 方法必须持有锁,持有锁的线程唤醒被因调用 Condition#await 方法而阻塞的线程。

等待唤醒机制 await/signal 实验

@Slf4j
public class ConditionTest {

public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

new Thread(() -> {
lock.lock();
try {
log.info(Thread.currentThread().getName() + "开始执行任务");
condition.await();
log.info(Thread.currentThread().getName() + "任务执行结束");
} catch (Throwable e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
lock.lock();
try {
log.info(Thread.currentThread().getName() + "开始执行任务");
TimeUnit.SECONDS.sleep(2);
condition.signal();
log.info(Thread.currentThread().getName() + "任务执行结束");
} catch (Throwable e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t2").start();
}
}

从下面的结果我们可以看到 t1 线程获取锁过后,调用 ​​await​​方法进入阻塞状态并且释放锁,然后 t2 线程获取锁,并且去唤醒 t1 线程继续执行,这就是一个简单的条件队列例子。

输出结果如下: AQS 和 ReetrantLcok 特征和使用介绍_公平锁_08

ReetrantLock 与 synchroinzed 比较

  • synchroinzed 是 JVM 层次的锁实现,ReetrantLock 是 JDK 层次的锁实现;
  • synchroinzed 的锁状态是无法在 Java 代码中直接判断的,但是 ReetrantLock 可以通过 ReetrantLock#isLock 判断;
  • synchroinzed 是非公平锁,ReetrantLock 是可以公平的也可以是非公平的;
  • synchroinzed 是不可以被中断的,而 ReetrantLock#lockInterruptibly 方法是可以中断锁的;
  • 在发生异常的时候 synchroinzed 会自动释放锁,而 ReetrentLock 需要开发者在 finaly 代码块中显示释放锁;
  • ReetrantLock 获取锁的形式有很多中:如立即返回是否成功的 tryLock(),以及等嗲指定时长的获取,更加灵活;
  • synchroinzed 在特定的情况下已经在等待的线程是后来的线程先获得锁(回顾一下 synchroinzed 唤醒策略),而 ReetranLock 对于已经正在等待的线程是先来的先获取锁。

ReetrantLcok 特征

reetrantlock 是一种基于 aqs 框架的应用实现。是基于 JDK 的一种线程同步手段,他的功能类似与 synchronized 是一种互斥锁,相对于 synchronized 具备一下特点:

  • 可中断
  • 可设置超时时间
  • 可设置公平锁
  • 支持多个条件变量
  • 与 synchronized 一样,都支持可重入

ReetrantLcok 部分源码: AQS 和 ReetrantLcok 特征和使用介绍_后端_09

ReetrantLock 使用范式

使用方式:

// 1. 创建锁(默认非公平锁)
ReentrantLock lock = new ReentrantLock(false);
// 2. 加锁
lock.lock();
try {
// 3. todo 原子操作
} finally {
// 4. 解锁
lock.unlock();
}

可重入特征

下面我们来测试一下 ReetrantLock 的几个特征:

  • 可重入,就是说在一个线程内可以多次获取锁。下面是一个简单的例子:
public static void lockReentrant() {
ReentrantLock lock = new ReentrantLock();
lock.lock();
log.info("main 线程获取锁 1 次");
lock.lock();
log.info("main 线程获取锁 1 次");
lock.unlock();
log.info("main 线程解锁 1 次");
lock.unlock();
log.info("main 线程解锁 1 次");
}

输出结果如下: AQS 和 ReetrantLcok 特征和使用介绍_公平锁_10

可中断特征

代码如下:

ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
log.info(Thread.currentThread().getName() + " 启动。。。");
try {
lock.lockInterruptibly();
log.info(Thread.currentThread().getName() + " 成功获取锁。。。");
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
log.info(Thread.currentThread().getName() + " 等待锁的过程中被中断。。。");
}
}, "t1");

lock.lock();
try {
t1.start();

log.info(Thread.currentThread().getName() + " 成功获取锁。。。");
Thread.sleep(2000);

t1.interrupt();
log.info("t1 执行中断。。。");
} catch (InterruptedException e) {
e.printStackTrace();
log.info(Thread.currentThread().getName() + " 等待锁的过程中被中断。。。");
} finally {
lock.unlock();
}

这个场景主要是模仿,对于线程中断的场景,然后放弃锁的获取,减少锁的无效竞争者。 输出结果如下: AQS 和 ReetrantLcok 特征和使用介绍_公平锁_11

  • 设置获取锁的超时时间,比如我们对于一些互斥操作, 只能让一个线程获取成功,但是允许其他线程在允许的时候内重试,来保证最大的并发执行。
@Slf4j
public class ReentrantLockTest {


public static void main(String[] args) {
lockTimeOut();
}

public static void lockTimeOut() {
// 1. 创建一个 ReentrantLock 实例
ReentrantLock lock = new ReentrantLock();
// 2. 创建线程 t1
Thread t1 = new Thread(() -> {
log.debug("t1 线程启动。。。。");
try {
// t1 尝试获取锁,锁获取超时时间 1s
if (lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("t1 线程等待 1s 后 获取锁失败");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 如果是自己获取锁才去解锁
if (lock.isHeldByCurrentThread()) { lock.unlock(); }
}
}, "t1");

// 3. 主线程获取锁
lock.lock();
try {
log.debug("main 线程获取锁成功");
// 4. 启动 t1 线程
t1.start();
// 5. 休眠 2s
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 如果是自己获取锁才去解锁
if (lock.isHeldByCurrentThread()) { lock.unlock(); }
}
}
}

输出结果: AQS 和 ReetrantLcok 特征和使用介绍_公平锁_12

公平与非公平特征

我们通过 ​​ReentrantLock​​​的构造方法可以看出,ReentrantLock 默认为非公平。 AQS 和 ReetrantLcok 特征和使用介绍_自定义_13为什么是非公平呢?因为大多数场景,其实对于锁来说都是并发竞争临界资源,非公平锁可以减少 AQS 的出队,入队操作提高性能,缺点就是非常极端的情况下,会导致线程饥饿。但是如果大量的排队问题我们更应该去关注程序本身,或者用 MQ 体系来代替锁机制。

Condition 总结

java.util.concurrent 类库中提供 Condition 类来实现线程之间的协调。调用 Condition.await() 方法使线程等待,其他线程调用 Condition.signal() 或 Condition.signalAll() 方法唤醒等待的线程。注意:调用Condition的await() 和 signal() 方法,都必须在 lock 保护之内。 还是之前的例子,我们在回顾一下上面的定义和规律 AQS 和 ReetrantLcok 特征和使用介绍_自定义_14

参考资料

  1. ​​baike.baidu.com/item/Abstra…​​
举报

相关推荐

0 条评论