点击上方“程序员蜗牛g”,选择“设为星标”跟蜗牛哥一起,每天进步一点点
程序员蜗牛g大厂程序员一枚 跟蜗牛一起 每天进步一点点31篇原创内容公众号
了解并合理使用线程池,是一个开发人员必修的基本功。
Exchanger
的使用和原理
线程池是什么?
Chaya:“线程池有什么好处呢?”
使用场景
不聊原理,先说下如何使用。
Web 应用的并发请求处理
Web 应用通常需要同时处理多个用户的请求。为了不每个请求都创建一个新线程,可以使用线程池来复用一定数量的线程:
public class WebServer {
// 创建固定大小的线程池¡¡以处理用户请求
privatestaticfinal ExecutorService executor = Executors.newFixedThreadPool(100);
public static void handleRequest(HttpRequest request) {
CompletableFuture.runAsync(() -> {
// 处理请求
processRequest(request);
}, executor);
}
private static void processRequest(HttpRequest request) {
// 处理请求的实现
}
}
后台任务和定时任务
应用程序可能需要定期执行一些后台任务,如数据库的清理工作。
可以使用ScheduledThreadPoolExecutor来安排这些任务:
public class BackgroundJobScheduler {
privatestaticfinal ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
public static void startCleanupJob() {
// 这里执行清理任务
scheduler.scheduleAtFixedRate(BackgroundJobScheduler::performCleanup
, 0, 1, TimeUnit.HOURS);
}
private static void performCleanup() {
// 清理工作的实现
}
}
异步操作
用户下单后可能需要进行一系列后台操作,比如发送确认邮件、通知仓库出货等。
public class ECommerceApplication {
privatestaticfinal ExecutorService pool = Executors.newCachedThreadPool();
public static void completeOrder(Order order) {
// 异步发送确认邮件
pool.execute(() -> sendConfirmationEmail(order));
// 异步通知仓库
pool.execute(() -> notifyWarehouse(order));
}
private static void sendConfirmationEmail(Order order) {
// 邮件发送逻辑
}
private static void notifyWarehouse(Order order) {
// 仓库通知逻辑
}
}
Executor 线程池核心设计
public interface Executor {
void execute(Runnable command);
}
我们首先来看一下 ThreadPoolExecutor、ScheduledThreadPoolExecutor、FutureTask、ForkJoinPool
的 UML 类图,全局上了解下线程池的继承关系。
ExecutorService
接口增加了一些能力:
- 扩充执行任务的能力:可以为一个或一批异步任务生成 Future 的方法;
- 提供了管控线程池的方法,比如停止线程池的运行。
AbstractExecutorService
则是上层的抽象类,实现了 ExecutorService
,模板方法模式的运用,将执行任务的流程串联了起来,由子类继承,将变化点交给子类实现,保证下层的实现只需关注一个执行任务的方法即可。
ScheduledThreadPoolExecutor
线程池的特性是定时调度,专门设计了一个接口 ScheduledExecutorService
并继承接口 ExecutorService
,这就是单一职责的体现了,家人们!
该接口主要定义了定时调度的方法。
最下层就分别有 ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool
。
ThreadPoolExecutor 原理
ThreadPoolExecutor 运行机制是什么?如何同时维护线程状态和执行任务的呢?
核心组件关系如下图所示:
任务运行机制
“从全局视角说下线程池的运行机制把。”
其运行机制如下图所示:
流程图如下:
为了让你更容易理解,再画一个时序图。
Executor
:线程池任务调度入口;Queue
:阻塞队列Worker
:实现Runnable
并继承AbstractQueuedSynchronizer
,线程池中的任务线程抽象。RejectedHandler
:拒绝策略。
接下来我们分别分析线程池核心组件的作用和实现原理。
状态控制(ctl 变量)
如何维护线程池运行状态和工作线程呢?
使用 32 位整型 AtomicInteger 同时维护线程池的运行状态和工作线程:
- 高 3 位:线程池状态(
RUNNING, SHUTDOWN, STOP 等)
- 低 29 位:工作线程数量
Java 中的线程池具有不同的状态,这些状态反映了线程池在其生命周期中的不同阶段和行为。主要的线程池状态有以下几种:
(运行中) | 表示线程池正在正常运行,并且可以接受新的任务提交。在这种状态下,线程池可以执行任务,并且可以创建新的线程来处理任务。 |
(关闭中) | 表示线程池正在关闭中。在这种状态下,线程池不再接受新的任务提交,但会继续执行已提交的任务,直到所有任务执行完成。 |
(停止) | 表示线程池已经停止,不再接受新的任务提交,并且尝试中断正在执行的任务。 |
(终止) | 表示线程池已经终止,不再接受新的任务提交,并且所有任务已经执行完成。在这种状态下,线程池中的所有线程都已经被销毁。 |
通过 ctl 字段,ThreadPoolExecutor
类能够高效地维护线程池的状态和线程数量信息,从而实现了对线程池的有效管理和控制。
“爱一个人会变,线程池状态又如何变化呢?”
线程池的状态不是直接设置的,而是通过调用 shutdown
()、shutdownNow
() 等方法触发状态的转换。
例如,调用 shutdown
() 方法会将线程池的状态从 RUNNING
转换为 SHUTDOWN
。
其生命周期转换如下入所示:
阻塞队列——任务缓冲
线程池中是通过阻塞队列实现生产者-消费者模式。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
使用不同的队列可以实现不一样的任务存取策略。
拒绝策略
“如果无止尽的海量任务丢给线程池,线程池处理不过来了?”
这时候就要设计一个拒绝策略了,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到 maximumPoolSize 时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
除了上述标准拒绝策略之外,您还可以实现 RejectedExecutionHandler
接口来定义自定义的拒绝策略。
这样你就可以根据应用程序的需求实现更复杂的拒绝逻辑。
RejectedExecutionHandler
接口:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Worker 线程管理
“线程池如何管理线程的状态和生命周期呢?”
设计了一个工作线程 Worker 来管理。
private finalclass Worker
extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // 禁止中断直到runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// 省略AQS方法实现...
}
Worker 执行任务的模型如下图所示:
线程池如何管理线程生命周期?
线程池使用一张 HashSet 表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。
Worker 线程复用
线程池线程回收复用过程如图所示:
Worker 线程增加
执行流程如下图所示:
Worker 线程垃圾回收
图10 线程销毁流程
线程回收的工作是在 processWorkerExit
方法完成的。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 异常终止才需要补偿
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); // 从集合中移除
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试终止线程池
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 计算最小保留线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false); // 补充新Worker
}
}
场景类型 | 触发条件 | 处理方式 |
正常退出 | getTask()返回 null | 检查是否需要补充新 Worker |
异常退出 | 任务执行抛出未捕获异常 | 立即补充新 Worker |
配置变更 | 核心线程数调整 | 动态调整存活 Worker 数量 |
线程池关闭 | shutdown/shutdownNow 调用 | 中断所有 Worker 并清空队列 |
Worker 线程执行任务
任务执行主流程如下:
源码分析:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock(); // 获取Worker锁
// 检查线程池状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 实际执行任务
} catch (Throwable x) {
thrown = x;
throw x;
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
设计亮点分析
- 无锁化设计:通过 CAS 操作修改 ctl 状态,避免全局锁竞争
- 线程复用:
Worker
循环从队列获取任务,减少线程创建开销 - 弹性扩容:
corePoolSize
维持常驻线程,maximumPoolSize
应对突发流量 - 优雅降级:队列缓冲+拒绝策略防止资源耗尽
ScheduledThreadPoolExecutor
有了前文的线程池实现原理做铺垫,掌握 ScheduledThreadPoolExecutor
就轻松多了。
“ScheduledThreadPoolExecutor 是什么?”
使用场景
光说不练假把式,想要学习一个框架的原理,第一步先要理解使用场景,并把它跑起来。
1. 定时任务调度
image-20250426171745139
代码案例如下:
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
// 系统启动后5秒执行初始化
scheduler.schedule(() -> initConfig(), 5, TimeUnit.SECONDS);
// 每天9:30执行数据归档(需计算初始延迟)
long initialDelay = calculateDelay(9, 30);
scheduler.scheduleAtFixedRate(() -> archiveData(),
initialDelay, 24 * 60 * 60, TimeUnit.SECONDS);
2. 心跳监测机制
scheduler.scheduleWithFixedDelay(() -> {
try {
HeartbeatResponse res = httpClient.checkHealth();
if (res.isHealthy()) {
resetFailureCount();
}
} catch (TimeoutException e) {
if (incrementAndGetFailureCount() > 3) {
alertSystem.sendCriticalAlert();
}
}
}, 0, 30, TimeUnit.SECONDS); // 立即开始,间隔30秒
最佳防御式编程示例
scheduler.scheduleAtFixedRate(() -> {
try {
// 业务代码
processBusinessLogic();
// 添加健康检查点
if (!systemStatus.isHealthy()) {
thrownew ServiceUnavailableException();
}
} catch (BusinessException e) {
// 业务可恢复异常
logger.warn("业务处理警告", e);
} catch (Throwable t) {
// 不可恢复异常处理
logger.error("致命错误触发任务终止", t);
emergencyRepair();
}
}, 0, 1, TimeUnit.MINUTES);
优雅关闭
public void gracefulShutdown(ScheduledThreadPoolExecutor executor) {
executor.shutdown(); // 禁止新任务提交
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
List<Runnable> dropped = executor.shutdownNow();
logger.warn("强制关闭,丢弃{}个任务", dropped.size());
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
logger.error("线程池未完全关闭");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
executor.shutdownNow();
}
}
实现原理
看下类图,可以发现 ScheduledThreadPoolExecutor
继承 ThreadPoolExecutor
,并实现了 ScheduledExecutorService
接口。
public interface ScheduledExecutorService extends ExecutorService {
/**
* 安排一个Runnable任务在给定的延迟后执行。
*
* @param command 需要执行的任务
* @param delay 延迟时间
* @param unit 时间单位
* @return 可用于提取结果或取消的ScheduledFuture
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/**
* 安排一个Callable任务在给定的延迟后执行。
*
* @param callable 需要执行的任务
* @param delay 延迟时间
* @param unit 时间单位
* @return 可用于提取结果或取消的ScheduledFuture
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
/**
* 安排一个Runnable任务在给定的初始延迟后首次执行,随后每个period时间间隔执行一次。
*
* @param command 需要执行的任务
* @param initialDelay 首次执行的初始延迟
* @param period 连续执行之间的时间间隔
* @param unit 时间单位
* @return 可用于提取结果或取消的ScheduledFuture
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* 安排一个Runnable任务在给定的初始延迟后首次执行,随后每次完成任务后等待指定的延迟再次执行。
*
* @param command 需要执行的任务
* @param initialDelay 首次执行的初始延迟
* @param delay 每次执行结束后的延迟时间
* @param unit 时间单位
* @return 可用于提取结果或取消的ScheduledFuture
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
ScheduledExecutorService
接口继承了 ExecutorService
接口,并增加了几个定时相关的接口方法。前两个方法用于单次调度执行任务,区别是有没有返回值。
核心结构
可以转换成如下的数组:
在这种结构中,可以发现有如下特性。假设,索引值从 0 开始,子节点的索引值为 k,父节点的索引值为 p,则:
- 一个节点的左子节点的索引为:k = p * 2 + 1;
- 一个节点的右子节点的索引为:k = (p + 1) * 2;
- 一个节点的父节点的索引为:p = (k - 1) / 2。
任务调度执行全流程。
我们先来看下 ScheduledThreadPoolExecutor
的构造方法,其实在 executors
框架概述中讲 Executors
时已经接触过~
Executors
使用 newScheduledThreadPool
工厂方法创建 ScheduledThreadPoolExecutor
:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
我们来看下 ScheduledThreadPoolExecutor
的构造器,内部其实都是调用了父类 ThreadPoolExecutor
的构造器,这里最需要注意的就是任务队列的选择——DelayedWorkQueue
.
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
执行过程
ScheduledThreadPoolExecutor
的核心调度方法是 schedule
、scheduleAtFixedRate
、scheduleWithFixedDelay
,我们通过 schedule
方法来看下整个调度流程:
// delay时长后执行任务command,该任务只执行一次
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 这里的decorateTask方法仅仅返回第二个参数
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null, triggerTime(delay,unit)));
// 延时或者周期执行任务的主要方法,稍后统一说明
delayedExecute(t);
return t;
}
上述的 decorateTask
方法把 Runnable
任务包装成 ScheduledFutureTask
,用户可以根据自己的需要覆写该方法。
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
/**
* 任务序号, 自增唯一
*/
privatefinallong sequenceNumber;
/**
* 首次执行的时间点
*/
privatelong time;
/**
* 0: 非周期任务
* >0: fixed-rate任务
* <0: fixed-delay任务
*/
privatefinallong period;
/**
* 在堆中的索引
*/
int heapIndex;
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// ...
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown()) // 线程池已关闭
reject(task); // 任务拒绝策略
else {
super.getQueue().add(task); // 将任务入队
// 如果线程池已关闭且该任务是非周期任务, 则将其从队列移除
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false); // 取消任务
else
ensurePrestart(); // 添加一个工作线程
}
}
ScheduledThreadPoolExecutor
的整个任务调度流程大致如下图:
我们来分析这个过程:
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
ForkJoinPool
分治任务模型
分治任务模型可分为两个阶段:
- 一个阶段是 任务分解,就是迭代地将任务分解为子任务,直到子任务可以直接计算出结果;
- 另一个阶段是 结果合并,即逐层合并子任务的执行结果,直到获得最终结果。
下图是一个简化的分治任务模型图,你可以对照着理解。
工作窃取
有的线程执行比较快,如何提高效率让闲着去抢任务呢?
工作线程worker1
、worker2
以及worker3
都从taskQueue
的尾部popping
获取task
,而任务也从尾部Pushing
,当worker3
队列中没有任务的时候,就会从其他线程的队列中取stealing
,这样就使得worker3
不会由于没有任务而空闲。
时序图如下:
案例
- 大数据处理:数组排序、矩阵运算(参考案例:百万级数据求和效率提升 3 倍)
- 分治算法:快速排序、归并排序、斐波那契数列计算。
- 并行流基础:Java 8 的
parallelStream()
底层实现。
大数据集并行处理
public interface Calculator {
/**
* 把传进来的所有numbers 做求和处理
*
* @param numbers
* @return 总和
*/
long sumUp(long[] numbers);
}
ForkJoinCalculator 实现 Calculator
接口,内部类 SumTask
继承 RecursiveTask
抽象类,并在 compute
方法中定义拆分逻辑及计算。
最后在 sumUp
方法中调用 pool
方法进行计算。
public class ForkJoinCalculator implements Calculator {
private ForkJoinPool pool;
// 1. 定义计算逻辑
privatestaticclass SumTask extends RecursiveTask<Long> {
privatelong[] numbers;
privateint from;
privateint to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
//此方法为ForkJoin的核心方法:对任务进行拆分 拆分的好坏决定了效率的高低
@Override
protected Long compute() {
// 当需要计算的数字个数小于6时,直接采用for loop方式计算结果
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
} else {
// 否则,把任务一分为二,递归拆分(注意此处有递归)到底拆分成多少分 需要根据具体情况而定
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
public ForkJoinCalculator() {
// 也可以使用公用的线程池 ForkJoinPool.commonPool():
// pool = ForkJoinPool.commonPool()
pool = new ForkJoinPool();
}
@Override
public long sumUp(long[] numbers) {
Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
pool.shutdown();
return result;
}
}
实现原理
ForkJoinPool
作为 Executors
框架的一员,从外部看与其它线程池并没有什么区别,仅仅是 ExecutorService
的一个实现类。
核心类关系图。
ForkJoinPool
对象的构建有两种方式:
- 通过 3 种构造器的任意一种进行构造;
- 通过
ForkJoinPool.commonPool()
静态方法构造。
JDK8 以后,ForkJoinPool 又提供了一个静态方法 commonPool(),这个方法返回一个 ForkJoinPool 内部声明的静态 ForkJoinPool 实例,主要是为了简化线程池的构建,这个 ForkJoinPool 实例可以满足大多数的使用场景。
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}
ForkJoinTask
从 Fork/Join 框架的描述上来看,“任务”必须要满足一定的条件:
- 支持
Fork
,即任务自身的分解 - 支持
Join
,即任务结果的合并
因此,J.U.C
提供了一个抽象类——ForkJoinTask,来作为该类 Fork/Join
任务的抽象定义:
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
public abstractclass RecursiveAction extends ForkJoinTask<Void> {
/**
* 该任务的执行,子类覆写该方法
*/
protected abstract void compute();
public final Void getRawResult() { returnnull; }
protected final void setRawResult(Void mustBeNull) { }
protected final boolean exec() {
compute();
returntrue;
}
}
public abstractclass RecursiveTask<V> extends ForkJoinTask<V> {
/**
* 该任务的执行结果.
*/
V result;
/**
* 该任务的执行,子类覆写该方法
*/
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
protected final boolean exec() {
result = compute();
returntrue;
}
}
ForkJoinWorkerThread
Fork/Join
框架中,每个工作线程(Worker
)都有一个自己的任务队列(WorkerQueue
), 所以需要对一般的 Thread 做些特性化处理,J.U.C 提供了ForkJoinWorkerThread类作为 ForkJoinPool
中的工作线程:
public class ForkJoinWorkerThread extends Thread {
final ForkJoinPool pool; // 该工作线程归属的线程池
final ForkJoinPool.WorkQueue workQueue; // 对应的任务队列
protected ForkJoinWorkerThread(ForkJoinPool pool) {
super("aForkJoinWorkerThread"); // 指定工作线程名称
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
// ...
}
WorkQueue
任务调度流程
Fork/Join 框架的任务调度流程是什么样的?
任务提交
创建工作线程
任务执行
任务结果获取
任务结果一般通过ForkJoinTask的join
方法获得,其主要流程如下图:
任务结果获取的核心涉及两点:
- 互助窃取:
ForkJoinPool.helpStealer
- 算力补偿:
ForkJoinPool.tryCompensate
如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。