图解线程池ThreadPoolExecutor、定时调度 使用场景和原理

罗子僧

关注

阅读 29

05-12 21:01

点击上方“程序员蜗牛g”,选择“设为星标”跟蜗牛哥一起,每天进步一点点

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列

程序员蜗牛g大厂程序员一枚 跟蜗牛一起 每天进步一点点31篇原创内容公众号

了解并合理使用线程池,是一个开发人员必修的基本功。

  • Exchanger 的使用和原理

线程池是什么?

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_02

Chaya:“线程池有什么好处呢?”

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_03

使用场景

不聊原理,先说下如何使用。

Web 应用的并发请求处理

Web 应用通常需要同时处理多个用户的请求。为了不每个请求都创建一个新线程,可以使用线程池来复用一定数量的线程:

public class WebServer {
    // 创建固定大小的线程池¡¡以处理用户请求
    privatestaticfinal ExecutorService executor = Executors.newFixedThreadPool(100);

    public static void handleRequest(HttpRequest request) {

        CompletableFuture.runAsync(() -> {
            // 处理请求
            processRequest(request);
        }, executor);

    }

    private static void processRequest(HttpRequest request) {
        // 处理请求的实现
    }
}

后台任务和定时任务

应用程序可能需要定期执行一些后台任务,如数据库的清理工作。

可以使用ScheduledThreadPoolExecutor来安排这些任务:

public class BackgroundJobScheduler {
    privatestaticfinal ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

    public static void startCleanupJob() {
        // 这里执行清理任务
        scheduler.scheduleAtFixedRate(BackgroundJobScheduler::performCleanup
                , 0, 1, TimeUnit.HOURS);
    }

    private static void performCleanup() {
        // 清理工作的实现
    }
}

异步操作

用户下单后可能需要进行一系列后台操作,比如发送确认邮件、通知仓库出货等。

public class ECommerceApplication {
    privatestaticfinal ExecutorService pool = Executors.newCachedThreadPool();

    public static void completeOrder(Order order) {
        // 异步发送确认邮件
        pool.execute(() -> sendConfirmationEmail(order));

        // 异步通知仓库
        pool.execute(() -> notifyWarehouse(order));
    }

    private static void sendConfirmationEmail(Order order) {
        // 邮件发送逻辑
    }

    private static void notifyWarehouse(Order order) {
        // 仓库通知逻辑
    }
}

Executor 线程池核心设计

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_04


public interface Executor {
    void execute(Runnable command);
}

我们首先来看一下 ThreadPoolExecutor、ScheduledThreadPoolExecutor、FutureTask、ForkJoinPool的 UML 类图,全局上了解下线程池的继承关系。

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_05

ExecutorService接口增加了一些能力:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_06

  • 扩充执行任务的能力:可以为一个或一批异步任务生成 Future 的方法;
  • 提供了管控线程池的方法,比如停止线程池的运行。

AbstractExecutorService则是上层的抽象类,实现了 ExecutorService,模板方法模式的运用,将执行任务的流程串联了起来,由子类继承,将变化点交给子类实现,保证下层的实现只需关注一个执行任务的方法即可。

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_07

ScheduledThreadPoolExecutor 线程池的特性是定时调度,专门设计了一个接口 ScheduledExecutorService 并继承接口 ExecutorService,这就是单一职责的体现了,家人们!

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_08

该接口主要定义了定时调度的方法。

最下层就分别有 ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool

ThreadPoolExecutor 原理

ThreadPoolExecutor 运行机制是什么?如何同时维护线程状态和执行任务的呢?

核心组件关系如下图所示:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_09

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_10

任务运行机制

“从全局视角说下线程池的运行机制把。”

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_11


其运行机制如下图所示:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_12


图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_13

流程图如下:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_14

为了让你更容易理解,再画一个时序图。

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_15

  • Executor:线程池任务调度入口;
  • Queue:阻塞队列
  • Worker:实现 Runnable 并继承 AbstractQueuedSynchronizer,线程池中的任务线程抽象。
  • RejectedHandler:拒绝策略。

接下来我们分别分析线程池核心组件的作用和实现原理。

状态控制(ctl 变量)

如何维护线程池运行状态和工作线程呢?

使用 32 位整型 AtomicInteger 同时维护线程池的运行状态和工作线程:

  • 高 3 位:线程池状态(RUNNING, SHUTDOWN, STOP 等)
  • 低 29 位:工作线程数量

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_16

Java 中的线程池具有不同的状态,这些状态反映了线程池在其生命周期中的不同阶段和行为。主要的线程池状态有以下几种:

RUNNING

(运行中)

表示线程池正在正常运行,并且可以接受新的任务提交。在这种状态下,线程池可以执行任务,并且可以创建新的线程来处理任务。

SHUTDOWN

(关闭中)

表示线程池正在关闭中。在这种状态下,线程池不再接受新的任务提交,但会继续执行已提交的任务,直到所有任务执行完成。

STOP

(停止)

表示线程池已经停止,不再接受新的任务提交,并且尝试中断正在执行的任务。

TERMINATED

(终止)

表示线程池已经终止,不再接受新的任务提交,并且所有任务已经执行完成。在这种状态下,线程池中的所有线程都已经被销毁。

通过 ctl 字段,ThreadPoolExecutor 类能够高效地维护线程池的状态和线程数量信息,从而实现了对线程池的有效管理和控制。

“爱一个人会变,线程池状态又如何变化呢?”

线程池的状态不是直接设置的,而是通过调用 shutdown()、shutdownNow() 等方法触发状态的转换。

例如,调用 shutdown() 方法会将线程池的状态从 RUNNING 转换为 SHUTDOWN

其生命周期转换如下入所示:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_17

阻塞队列——任务缓冲

线程池中是通过阻塞队列实现生产者-消费者模式。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_18

使用不同的队列可以实现不一样的任务存取策略。

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_19

拒绝策略

“如果无止尽的海量任务丢给线程池,线程池处理不过来了?”

这时候就要设计一个拒绝策略了,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到 maximumPoolSize 时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_20

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_21

除了上述标准拒绝策略之外,您还可以实现 RejectedExecutionHandler 接口来定义自定义的拒绝策略。

这样你就可以根据应用程序的需求实现更复杂的拒绝逻辑。

RejectedExecutionHandler 接口:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_22

Worker 线程管理

“线程池如何管理线程的状态和生命周期呢?”

设计了一个工作线程 Worker 来管理。

private finalclass Worker
    extends AbstractQueuedSynchronizer
    implements Runnable {

    final Thread thread;
    Runnable firstTask;

    Worker(Runnable firstTask) {
        setState(-1); // 禁止中断直到runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }

    // 省略AQS方法实现...
}

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_23

Worker 执行任务的模型如下图所示:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_24

线程池如何管理线程生命周期?

线程池使用一张 HashSet 表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。

Worker 线程复用

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_25


线程池线程回收复用过程如图所示:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_26

Worker 线程增加

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_27

执行流程如下图所示:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_28

Worker 线程垃圾回收

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_29


图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_30

图10 线程销毁流程

线程回收的工作是在 processWorkerExit 方法完成的。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // 异常终止才需要补偿
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w); // 从集合中移除
    } finally {
        mainLock.unlock();
    }

    tryTerminate(); // 尝试终止线程池

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // 计算最小保留线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return;
        }
        addWorker(null, false); // 补充新Worker
    }
}

场景类型

触发条件

处理方式

正常退出

getTask()返回 null

检查是否需要补充新 Worker

异常退出

任务执行抛出未捕获异常

立即补充新 Worker

配置变更

核心线程数调整

动态调整存活 Worker 数量

线程池关闭

shutdown/shutdownNow 调用

中断所有 Worker 并清空队列

Worker 线程执行任务

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_31

任务执行主流程如下:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_32

源码分析:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 获取Worker锁
            // 检查线程池状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); // 实际执行任务
                } catch (Throwable x) {
                    thrown = x;
                    throw x;
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

设计亮点分析

  1. 无锁化设计:通过 CAS 操作修改 ctl 状态,避免全局锁竞争
  2. 线程复用Worker 循环从队列获取任务,减少线程创建开销
  3. 弹性扩容corePoolSize 维持常驻线程,maximumPoolSize 应对突发流量
  4. 优雅降级:队列缓冲+拒绝策略防止资源耗尽

ScheduledThreadPoolExecutor

有了前文的线程池实现原理做铺垫,掌握 ScheduledThreadPoolExecutor 就轻松多了。

“ScheduledThreadPoolExecutor 是什么?”

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_33

使用场景

光说不练假把式,想要学习一个框架的原理,第一步先要理解使用场景,并把它跑起来。

1. 定时任务调度

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_34

image-20250426171745139

代码案例如下:

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);

// 系统启动后5秒执行初始化
scheduler.schedule(() -> initConfig(), 5, TimeUnit.SECONDS);

// 每天9:30执行数据归档(需计算初始延迟)
long initialDelay = calculateDelay(9, 30);
scheduler.scheduleAtFixedRate(() -> archiveData(),
    initialDelay, 24 * 60 * 60, TimeUnit.SECONDS);

2. 心跳监测机制

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_35

scheduler.scheduleWithFixedDelay(() -> {
    try {
        HeartbeatResponse res = httpClient.checkHealth();
        if (res.isHealthy()) {
            resetFailureCount();
        }
    } catch (TimeoutException e) {
        if (incrementAndGetFailureCount() > 3) {
            alertSystem.sendCriticalAlert();
        }
    }
}, 0, 30, TimeUnit.SECONDS); // 立即开始,间隔30秒

最佳防御式编程示例

scheduler.scheduleAtFixedRate(() -> {
    try {
        // 业务代码
        processBusinessLogic();

        // 添加健康检查点
        if (!systemStatus.isHealthy()) {
            thrownew ServiceUnavailableException();
        }
    } catch (BusinessException e) {
        // 业务可恢复异常
        logger.warn("业务处理警告", e);
    } catch (Throwable t) {
        // 不可恢复异常处理
        logger.error("致命错误触发任务终止", t);
        emergencyRepair();
    }
}, 0, 1, TimeUnit.MINUTES);

优雅关闭

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_36

public void gracefulShutdown(ScheduledThreadPoolExecutor executor) {
    executor.shutdown(); // 禁止新任务提交
    try {
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            List<Runnable> dropped = executor.shutdownNow();
            logger.warn("强制关闭,丢弃{}个任务", dropped.size());
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                logger.error("线程池未完全关闭");
            }
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        executor.shutdownNow();
    }
}

实现原理

看下类图,可以发现 ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor,并实现了 ScheduledExecutorService接口。

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_37

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_38


public interface ScheduledExecutorService extends ExecutorService {

    /**
     * 安排一个Runnable任务在给定的延迟后执行。
     *
     * @param command 需要执行的任务
     * @param delay 延迟时间
     * @param unit 时间单位
     * @return 可用于提取结果或取消的ScheduledFuture
     */
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    /**
     * 安排一个Callable任务在给定的延迟后执行。
     *
     * @param callable 需要执行的任务
     * @param delay 延迟时间
     * @param unit 时间单位
     * @return 可用于提取结果或取消的ScheduledFuture
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    /**
     * 安排一个Runnable任务在给定的初始延迟后首次执行,随后每个period时间间隔执行一次。
     *
     * @param command 需要执行的任务
     * @param initialDelay 首次执行的初始延迟
     * @param period 连续执行之间的时间间隔
     * @param unit 时间单位
     * @return 可用于提取结果或取消的ScheduledFuture
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    /**
     * 安排一个Runnable任务在给定的初始延迟后首次执行,随后每次完成任务后等待指定的延迟再次执行。
     *
     * @param command 需要执行的任务
     * @param initialDelay 首次执行的初始延迟
     * @param delay 每次执行结束后的延迟时间
     * @param unit 时间单位
     * @return 可用于提取结果或取消的ScheduledFuture
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

ScheduledExecutorService 接口继承了 ExecutorService 接口,并增加了几个定时相关的接口方法。前两个方法用于单次调度执行任务,区别是有没有返回值。

核心结构

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_39

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_40


图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_41

可以转换成如下的数组:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_42

在这种结构中,可以发现有如下特性。假设,索引值从 0 开始,子节点的索引值为 k,父节点的索引值为 p,则:

  • 一个节点的左子节点的索引为:k = p * 2 + 1;
  • 一个节点的右子节点的索引为:k = (p + 1) * 2;
  • 一个节点的父节点的索引为:p = (k - 1) / 2。

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_43

任务调度执行全流程。

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_44

我们先来看下 ScheduledThreadPoolExecutor 的构造方法,其实在 executors 框架概述中讲 Executors 时已经接触过~

Executors 使用 newScheduledThreadPool 工厂方法创建 ScheduledThreadPoolExecutor

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

我们来看下 ScheduledThreadPoolExecutor 的构造器,内部其实都是调用了父类 ThreadPoolExecutor 的构造器,这里最需要注意的就是任务队列的选择——DelayedWorkQueue.

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }


    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue(), handler);
    }


    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

执行过程

ScheduledThreadPoolExecutor 的核心调度方法是 schedulescheduleAtFixedRatescheduleWithFixedDelay,我们通过 schedule 方法来看下整个调度流程:

// delay时长后执行任务command,该任务只执行一次
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    // 这里的decorateTask方法仅仅返回第二个参数
    RunnableScheduledFuture<?> t = decorateTask(command,
                                     new ScheduledFutureTask<Void>(command, null, triggerTime(delay,unit)));
    // 延时或者周期执行任务的主要方法,稍后统一说明
    delayedExecute(t);
    return t;
}

上述的 decorateTask 方法把 Runnable 任务包装成 ScheduledFutureTask,用户可以根据自己的需要覆写该方法。

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

    /**
     * 任务序号, 自增唯一
     */
    privatefinallong sequenceNumber;

    /**
     * 首次执行的时间点
     */
    privatelong time;

    /**
     * 0: 非周期任务
     * >0: fixed-rate任务
     * <0: fixed-delay任务
     */
    privatefinallong period;

    /**
     * 在堆中的索引
     */
    int heapIndex;

    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    // ...
}

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_45

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())   // 线程池已关闭
        reject(task);   // 任务拒绝策略
    else {
        super.getQueue().add(task);                 // 将任务入队

        // 如果线程池已关闭且该任务是非周期任务, 则将其从队列移除
        if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
            task.cancel(false);  // 取消任务
        else
            ensurePrestart();   // 添加一个工作线程
    }
}

ScheduledThreadPoolExecutor 的整个任务调度流程大致如下图:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_46

我们来分析这个过程:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_47

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_48

ForkJoinPool

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_49

分治任务模型

分治任务模型可分为两个阶段:

  • 一个阶段是 任务分解,就是迭代地将任务分解为子任务,直到子任务可以直接计算出结果;
  • 另一个阶段是 结果合并,即逐层合并子任务的执行结果,直到获得最终结果。

下图是一个简化的分治任务模型图,你可以对照着理解。

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_50

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_51


工作窃取

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_52

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_53

有的线程执行比较快,如何提高效率让闲着去抢任务呢?

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_54


图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_55

工作线程worker1worker2以及worker3都从taskQueue的尾部popping获取task,而任务也从尾部Pushing,当worker3队列中没有任务的时候,就会从其他线程的队列中取stealing,这样就使得worker3不会由于没有任务而空闲。

时序图如下:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_56

案例

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_57

  • 大数据处理:数组排序、矩阵运算(参考案例:百万级数据求和效率提升 3 倍)
  • 分治算法:快速排序、归并排序、斐波那契数列计算。
  • 并行流基础:Java 8 的parallelStream()底层实现。

大数据集并行处理

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_58

public interface Calculator {
     /**
      * 把传进来的所有numbers 做求和处理
      *
      * @param numbers
      * @return 总和
      */
     long sumUp(long[] numbers);
 }

ForkJoinCalculator 实现 Calculator 接口,内部类 SumTask 继承 RecursiveTask 抽象类,并在 compute 方法中定义拆分逻辑及计算。

最后在 sumUp 方法中调用 pool 方法进行计算。

public class ForkJoinCalculator implements Calculator {

     private ForkJoinPool pool;

     // 1. 定义计算逻辑
     privatestaticclass SumTask extends RecursiveTask<Long> {
         privatelong[] numbers;
         privateint from;
         privateint to;

         public SumTask(long[] numbers, int from, int to) {
             this.numbers = numbers;
             this.from = from;
             this.to = to;
         }

         //此方法为ForkJoin的核心方法:对任务进行拆分  拆分的好坏决定了效率的高低
         @Override
         protected Long compute() {

             // 当需要计算的数字个数小于6时,直接采用for loop方式计算结果
             if (to - from < 6) {
                 long total = 0;
                 for (int i = from; i <= to; i++) {
                     total += numbers[i];
                 }
                 return total;
             } else {
                 // 否则,把任务一分为二,递归拆分(注意此处有递归)到底拆分成多少分 需要根据具体情况而定
                 int middle = (from + to) / 2;
                 SumTask taskLeft = new SumTask(numbers, from, middle);
                 SumTask taskRight = new SumTask(numbers, middle + 1, to);
                 taskLeft.fork();
                 taskRight.fork();
                 return taskLeft.join() + taskRight.join();
             }
         }
     }

     public ForkJoinCalculator() {
         // 也可以使用公用的线程池 ForkJoinPool.commonPool():
         // pool = ForkJoinPool.commonPool()
         pool = new ForkJoinPool();
     }

     @Override
     public long sumUp(long[] numbers) {
         Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
         pool.shutdown();
         return result;
     }
 }

实现原理

ForkJoinPool 作为 Executors 框架的一员,从外部看与其它线程池并没有什么区别,仅仅是 ExecutorService 的一个实现类。

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_59

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_60

核心类关系图。


图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_61

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_62


ForkJoinPool 对象的构建有两种方式:

  1. 通过 3 种构造器的任意一种进行构造;
  2. 通过ForkJoinPool.commonPool()静态方法构造。

JDK8 以后,ForkJoinPool 又提供了一个静态方法 commonPool(),这个方法返回一个 ForkJoinPool 内部声明的静态 ForkJoinPool 实例,主要是为了简化线程池的构建,这个 ForkJoinPool 实例可以满足大多数的使用场景。

public static ForkJoinPool commonPool() {
     // assert common != null : "static init error";
     return common;
 }

ForkJoinTask

从 Fork/Join 框架的描述上来看,“任务”必须要满足一定的条件:

  1. 支持 Fork,即任务自身的分解
  2. 支持 Join,即任务结果的合并

因此,J.U.C 提供了一个抽象类——ForkJoinTask,来作为该类 Fork/Join 任务的抽象定义:

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_63

public abstractclass RecursiveAction extends ForkJoinTask<Void> {
    /**
     * 该任务的执行,子类覆写该方法
     */
    protected abstract void compute();

    public final Void getRawResult() { returnnull; }

    protected final void setRawResult(Void mustBeNull) { }

    protected final boolean exec() {
        compute();
        returntrue;
    }
}

public abstractclass RecursiveTask<V> extends ForkJoinTask<V> {

    /**
     * 该任务的执行结果.
     */
    V result;

    /**
     * 该任务的执行,子类覆写该方法
     */
    protected abstract V compute();

    public final V getRawResult() {
        return result;
    }

    protected final void setRawResult(V value) {
        result = value;
    }

    protected final boolean exec() {
        result = compute();
        returntrue;
    }

}

ForkJoinWorkerThread

Fork/Join 框架中,每个工作线程(Worker)都有一个自己的任务队列(WorkerQueue), 所以需要对一般的 Thread 做些特性化处理,J.U.C 提供了ForkJoinWorkerThread类作为 ForkJoinPool 中的工作线程:

public class ForkJoinWorkerThread extends Thread {

    final ForkJoinPool pool;                    // 该工作线程归属的线程池
    final ForkJoinPool.WorkQueue workQueue;     // 对应的任务队列

    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        super("aForkJoinWorkerThread");         // 指定工作线程名称
        this.pool = pool;
        this.workQueue = pool.registerWorker(this);
    }

    // ...
}

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_64

WorkQueue

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_阻塞队列_65

任务调度流程

Fork/Join 框架的任务调度流程是什么样的?

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_66

任务提交

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_67

创建工作线程

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_工作线程_68

任务执行

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_69

任务结果获取

任务结果一般通过ForkJoinTaskjoin方法获得,其主要流程如下图:

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_70

任务结果获取的核心涉及两点:

  • 互助窃取:ForkJoinPool.helpStealer
  • 算力补偿:ForkJoinPool.tryCompensate

图解线程池ThreadPoolExecutor、定时调度 使用场景和原理_线程池_71


如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。

精彩评论(0)

0 0 举报