0
点赞
收藏
分享

微信扫一扫

线程池ThreadPoolExecutor分析

禾木瞎写 2022-03-12 阅读 43

文章目录

(一)线程池背景

当程序需要执行大量程序任务时,需要使用多线程来异步处理提升性能时,通常手段时new一个线程来开启执行,会导致程序不断的创建线程和销毁,不利于管理和监控,同时也会导致内核性能急剧下降,线程池由此出现,为了解决内核带来的损耗,所有线程任务统一由线程池来管理,同时也复用了线程的线程,避免了线程的不断创建和销毁,也可以更加方便管理线程。

(二)线程池优势

(三)线程池场景

(四)线程池源码分析

(1)核心成员变量

/**

  • 线程池核心变量:
  • 线程池主要是通过ctl开实现线程池状态和线程数量
    */
    //ctl核心变量,是线程池状态(高三位)和线程数量(低29位)结合体
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //线程数总位数(二进制位):29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //线程数最大值:00011111111111111111111111111111
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

/**

  • 线程池状态含义:
  • Running:接受新任务且处理阻塞队列中的任务
  • ShutDown:拒绝新任务且处理阻塞队列中的任务
  • Stop:拒绝新任务且不处理阻塞队列任务,同时中断正在处理的线程任务;
  • Tidying:所有任务执行完,包含阻塞队列中的任务时,活跃线程数为0时,即将调用terminated方法的状态;
  • Terminated:终止状态,terminated方法执行完毕后的状态。
    */
    //线程池状态运行:11100000000000000000000000000000
    private static final int RUNNING = -1 << COUNT_BITS;
    //线程池状态拒绝状态:00000000000000000000000000000000
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    //线程池抛弃和拒绝状态:00100000000000000000000000000000
    private static final int STOP = 1 << COUNT_BITS;
    //线程池任务执行完任务状态:01000000000000000000000000000000
    private static final int TIDYING = 2 << COUNT_BITS;
    //线程池终止状态:01100000000000000000000000000000
    private static final int TERMINATED = 3 << COUNT_BITS;

/**

  • 线程池辅助方法:
  • 主要是为ctl变量服务的方法,获取线程状态、工作线程数和计算新的ctl值
    */
    //获取线程池的状态,实际上高三位起作用
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    //获取工作线程数量,实际上低29位起作用
    private static int workerCountOf(int c) { return c & CAPACITY; }
    //计算新的ctl值即根据当前线程池状态和新的工作线程数量计算
    private static int ctlOf(int rs, int wc) { return rs | wc; }

(2)线程池状态转换

Running->Shutdown:显示调用shutdown方法或隐式调用finalize方法(内部包含调用shutdown方法);
Running/Shutdown->Stop:显示调用shutdownNow方法;
Shutdown->Tidying:当线程池和阻塞队列中都为空时;
Stop->Tidying:线程池为空时(Stop本身会自动清空阻塞队列);
Tidying->Terminated:当terminated方法中hook方法执行完时;

(3)线程池七大参数

corePoolSize:核心线程数;
maximunPoolSize:最大线程数;
keeyAliveTime:存活时间;
TimeUnit,存活时间的时间单位;
workQueue:任务队列,如有界阻塞数组队列ArrayBlockingQueue,无界阻塞链表队列LinkedBlockingQueue,
最多1个任务队列SynchronousQueue,优先级队列PriorityBlockingQueue等;
ThreadFactory:创建线程的工厂;
RejectedExecutionHandler:拒绝策略,当任务队列满且最大线程数达到最大值后采取的策略,
如AbortPolicy(抛异常)、CallerRunsPolicy(返回给调用者线程,让调用者执行)、
DiscardOldestPolicy(丢弃最久等待的任务来再次提交新任务到队列中)和DiscardPolicy(直接丢弃且不抛异常)。
说明:最大线程数触发条件,队列满了,线程池就会触发线程创建,用于处理任务;

(4)三大固定方法(Executors工具类不推荐使用)

1)newFixedThreadPool:创建一个核心线程个数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE,keeyAliveTime=0 说明只要线程个数比核心线程个数多并且当前空闲则回收——适用于长期、比较稳定性的任务

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
//扩展newFixedThreadPool方法比上述方法增加一个指定的创建线程工厂
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(), threadFactory);
}

2)newSingleThreadExecutor创建一个核心线程个数和最大线程个数都为1的线程池,并且阻塞队列长度为Integer.MAX_VALUE,keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收——适用于单个任务串行任务执行

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
   // 扩展newSingleThreadExecutor方法比上述方法增加一个指定的创建线程工厂
 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
      return new FinalizableDelegatedExecutorService
          (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),threadFactory));
  }

3)newCachedThreadPool创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为
Integer.MAX_VALUE,并且阻塞队列为同步队列,keeyAliveTime=60说明只要当前线程60s内空闲则回收。这个特殊在于加入到同步队列的任务会被马上被执行,同步队列里面最多只有一个任务——适用于任务多且每个任务执行短的场景

public static ExecutorService newCachedThreadPool() {
      return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
  }
//扩展newCachedThreadPool方法比上述方法增加一个指定的创建线程工厂
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(), threadFactory);
  }

(5)四大拒绝策略

Jdk内置的四种拒绝策略均实现了RejectExecutionHandler接口中的rejectedExecution方法
[void rejectedExecution(Runnable r, ThreadPoolExecutor executor);]

(6)execute方法(核心)

/*
 * 如果线程数小于核心线程数,则创建新线程来执行任务
 * 如果线程线程是Running状态且入队成功,再次检查线程池状态是否需要删除任务或增加线程数
 * 如果线程入队失败或创建线程失败,执行拒绝策略
 */
public void execute(Runnable command) {
    if (command == null)//空指针异常判断
        throw new NullPointerException();
    int c = ctl.get();//ctl是一个原子类型变量,获取线程池ctl变量
    if (workerCountOf(c) < corePoolSize) {//利用ctl进行低29位值与核心线程数对比
        if (addWorker(command, true))//创建工作线程(核心中的核心)
            return;
        c = ctl.get();//再次获取最新值,以防创建线程过程发生变化
    }
    //线程池处于Running状态时任务添加到阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();//重新获取ctl值
        if (!isRunning(recheck) && remove(command))//再次检查,线程池状态是否发生了变化
            reject(command);//拒绝策略
        else if (workerCountOf(recheck) == 0)//判定是否有空闲线程
            addWorker(null, false);//增加新线程
    }
    //不是接受新任务的状态或入队失败后尝试创建新线程(是否超过最大线程数)
    else if (!addWorker(command, false))
        reject(command);//拒绝策略(实际上代表已经达到最大线程数)
}

(7)addWorker方法(核心中的核心)

/**
 * addWorker主要分为两部分
 * 第一部分:使用双层自旋对线程池ctl变量进行工作线程+1
 * 第二部分:并发安全的将任务添加到工作线程并启动
 */
 private boolean addWorker(Runnable firstTask, boolean core) {
     retry:
     for (;;) {//自旋
         int c = ctl.get();//获取ctl值
         int rs = runStateOf(c);//获取线程池状态
         /**
         * 做出对线程池状态不同返回false的处理:
         * (1)线程池状态为Stop、Tidying和Terminated,返回false
         * (2)线程池状态为Shutdown并且firstTask任务存在,返回false
         * (3)线程池的任务队列是空的即检查队列在必要时是否为空,返回false
         */
         if (rs >= SHUTDOWN &&
             ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
             return false;
         /**
         * 再次内自旋保证创建新线程
         * (1)低于核心线程数时
         * (2)大于等于核心线程数且小于最大线程数时
         */
         for (;;) {
             int wc = workerCountOf(c);//获取低29位线程数
             /**
             * 工作线程是否超过程序允许的范围
             * core值决定阈值是否超过核心线程数或最大线程数
             */
             if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                 return false;
             //原子类自增工作线程,保证同一时刻只能有一个线程自增成功
             if (compareAndIncrementWorkerCount(c))
                 break retry;//cas成功跳出外层自旋
             c = ctl.get();  //重新获取ctl
             if (runStateOf(c) != rs)//检查线程池状态是否发生变化
                 continue retry;
             //如果代码执行到这里,则说明内层自旋中cas失败,重新自增
         }
     }

     //代码执行到这里说明ctl中已经成功CAS,让工作线程增加1
     /**
     * 上面一部分只是将工作线程增加+1,然后重新设置到ctl中
     * 此部分是真正产生新线程和从队列中取任务
     */
     boolean workerStarted = false;//工作线程开始标志默认false
     boolean workerAdded = false;//工作线程增加标志默认false
     Worker w = null;
     try {
         w = new Worker(firstTask);//创建工作线程,根据提交的线程任务
         final Thread t = w.thread;//获取任务线程
         if (t != null) {
             final ReentrantLock mainLock = this.mainLock;//使用全局锁
             mainLock.lock();//获取锁执行权
             try {
                 int rs = runStateOf(ctl.get());//获取线程池状态
                 /**
                 * 主要是判断线程池是否接受新任务
                 * (1)线程池状态是否为接受任务状态Running
                 * (2)线程池状态为Shutdown且该任务是否为空
                 */
                 if (rs < SHUTDOWN ||
                     (rs == SHUTDOWN && firstTask == null)) {
                     if (t.isAlive()) //判断是否已经被激活
                         throw new IllegalThreadStateException();
                     workers.add(w);//新的工作线程添加到HashSet中
                     int s = workers.size();
                     if (s > largestPoolSize)//不允许超过线程池最大值
                         largestPoolSize = s;
                     workerAdded = true;//工作线程添加成功
                 }
             } finally {
                 mainLock.unlock();//释放全局锁
             }
             if (workerAdded) {//工作线程增加成功
                 t.start();//开始调度
                 workerStarted = true;//标志更改开始执行
             }
         }
     } finally {
         if (! workerStarted)//开始start线程失败
             addWorkerFailed(w);//增加工作线程失败
     }
     return workerStarted;
 }

(8)工作线程Worker内部方法(runWorker方法)

构造方法:Worker的状态为-1,是为了避免当前worker在调用runWorker方法前被中断(当其它线程调用了线程池的shutdownNow时候,如果worker状态>= 0则会中断该线程)。这里设置了线程的状态为-1,所以该线程就不会被中断了。run方法中运行runWorker的代码时候会调用unlock方法,该方法把status变为了0,所以这时候调用shutdownNow会中断worker线程了。

Worker(Runnable firstTask) {
    setState(-1); //设置AQS中state状态值为-1,保证其工作线程在runWorker方法被调用之前禁止被中断
    this.firstTask = firstTask;//工作任务
    this.thread = getThreadFactory().newThread(this);//创建线程
}

执行工作线程run,因Worker实现了Runnable接口,主要是为了调用runWorker方法
public void run() {
    runWorker(this);
}
/**
* 工作线程执行真正的run方法,包含了对工作线程的管理、线程池的状态、新线程(核心线程)的开启
*/
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();//获取当前线程
    Runnable task = w.firstTask;//Runnable 的实例即任务
    w.firstTask = null;
    w.unlock(); //此处与Worker构造方法中设置-1有关,这里是允许被中断
    boolean completedAbruptly = true;//中断标记
    try {
        //执行任务为null或从队列中任务为null
        while (task != null || (task = getTask()) != null) {
            w.lock();//获取锁
            /**
            * 是否对工作线程进行中断条件如下:主要是对shutdown或者shutdownNow操作控制
            * (1)当前线程池状态是大于等于Stop状态,如果是则中断线程
            * (2)在(1)的检查下是小于Stop状态,则对当前线程是否有中断且再次检查线程池状态是否大于等于Stop
            * 同时也要检查工作线程是否有无中断标记,满足这三个条件下则也进行中断
            */
            if ((runStateAtLeast(ctl.get(), STOP ) ||
                 (Thread.interrupted()&&unStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())//检查工作线程是否中断条件
                wt.interrupt();//中断
            try {
                beforeExecute(wt, task);//该方法为工作线程和执行任务前置方法,目前是空,子类可重写
                Throwable thrown = null;
                try {
                    task.run();//调度执行任务方法
                } catch (RuntimeException x) {//运行时异常
                    thrown = x; throw x;
                } catch (Error x) {//程序出错
                    thrown = x; throw x;
                } catch (Throwable x) {//JVM层次异常
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);//该方法为工作线程和执行任务后置方法,目前是空,子类可重写
                }
            } finally {
                task = null;//help GC
                w.completedTasks++;//统计当前工作线程完成任务数量
                w.unlock();//释放w自身获得的锁执行权,Worker继承了AQS,所以是可以直接使用lock和unlock
            }
        }
        completedAbruptly = false;
    } finally {
        //执行工作线程之完毕的清理工作
        processWorkerExit(w, completedAbruptly);
    }
}

(9)processWorkerExit清理方法

/**
 * processWorkerExit作用如下:
 * (1)统计线程池完成任务数量
 * (2)线程状态转换且
 * (3)是否开启新线程
 */
 private void processWorkerExit(Worker w, boolean completedAbruptly) {
     if (completedAbruptly) //是否被中断,如果中断则要减少工作线程的数量
         decrementWorkerCount();//CAS操作减少ctl的工作线程数量
      //统计线程池完成任务的数量
     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();//全局锁,只允许同一时刻一个线程执行统计任务
     try {
         completedTaskCount += w.completedTasks;//完成任务数量统计
         workers.remove(w);//HashSet中去处当前工作线程
     } finally {
         mainLock.unlock();//释放全局锁
     }

     /**
     * 尝试对线程池的状态转换即向Terminated转变
     * (1)如果当前是shutdonw状态并且工作队列为空
     * (2)当前是stop状态当前线程池里面没有活动线程
     */
     tryTerminate();

     int c = ctl.get();//重新获取ctl值
     if (runStateLessThan(c, STOP)) {//检查当前线程池状态是否小于Stop状态即Running
         if (!completedAbruptly) {//当前工作线程是否被中断
             //线程池实例对象调用allowCoreThreadTimeOut(true);//可以设置人为设置
             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
             if (min == 0 && ! workQueue.isEmpty())//该条件时根据认为设置来确定大小,默认是0
                 min = 1;
             if (workerCountOf(c) >= min)//可能是默认0、1或核心线程数corePoolSize
                 return; //不需要增加线程且还有任务正在执行,当前工作线程数已达核心线程数
         }
         addWorker(null, false);//线程不足(未达到核心线程数)可以再次新开启工作线程
     }
 }

(10)tryTerminate状态转换

/**
 * 尝试对线程池的状态转换即向Terminated转变
 * (1)如果当前是shutdonw状态并且工作队列为空
 * (2)当前是stop状态当前线程池里面没有活动线程
 */
 final void tryTerminate() {
     for (;;) {//自旋
         int c = ctl.get();//获取ctl值
         /**
         *转换操作退出条件如下:
         * (1)线程池状态是Running状态退出向Terminated转变
         * (2)线程池状态是小于Tidying状态不可转变
         * (3)线程池状态时shutdown且任务队列不为空时不可转变
         * 总结:没有任务,也就是空闲状态下可转变
         */
         if (isRunning(c) ||
             runStateAtLeast(c, TIDYING) ||
             (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
             return;
         //执行到这里代表线程池状态为shutdown且队列任务为null或Stop状态
         if (workerCountOf(c) != 0) { // Eligible to terminate
             interruptIdleWorkers(ONLY_ONE);//中断工作线程
             return;
         }

         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();//允许一个线程执行状态转变
         try {
             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//CAS设置ctl状态为0
                 try {
                     terminated();//真正转变terminated状态,默认为空,子类可重写自定义逻辑
                 } finally {
                     ctl.set(ctlOf(TERMINATED, 0));//ctl值设置为0主要防止子类重写改变
                     //激活调用条件变量termination的await系列方法被阻塞的所有线程
                     termination.signalAll();//唤醒所有条件限制阻塞的线程
                 }
                 return;
             }
         } finally {
             mainLock.unlock();//释放锁
         }
         //代码执行到此处代表需要自旋
     }
 }

(11)shutdown方法

/**
* 设置线程池状态为shutdown后,拒绝新任务提交,仍然继续处理阻塞队列中的任务
*/
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();//主锁
    try {
        checkShutdownAccess();//检查是否有权限shutdown
        advanceRunState(SHUTDOWN);//将线程池状态设置为shutdown
        interruptIdleWorkers();//中断标志即中断空闲线程,有worker但是不是正在执行的线程
        onShutdown(); //子类重写或hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();//释放锁
    }
    tryTerminate();//状态转变
}

(12)shutdownNow方法

/**
* shutdownNow设置线程池状态为Stop后,拒绝新任务提交且中断当前工作线程
* 返回阻塞队列中的所有等待任务
*/
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();//检查权限
        advanceRunState(STOP);//设置线程池状态
        interruptWorkers();//中断当前真正执行的所有工作线程
        tasks = drainQueue();//返回阻塞队列中的任务
    } finally {
        mainLock.unlock();
    }
    tryTerminate();//状态转变
    return tasks;
}

(13)awaitTermination方法

public boolean awaitTermination(long timeout, TimeUnit unit)
         throws InterruptedException {
     long nanos = unit.toNanos(timeout);//超时纳秒值
     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();//独占锁
     try {
         for (;;) {自旋
             if (runStateAtLeast(ctl.get(), TERMINATED))//直到线程池状态改变为terminated
                 return true;
             if (nanos <= 0)//超时则返回
                 return false;
             nanos = termination.awaitNanos(nanos);//不停的计算超时差
         }
     } finally {
         mainLock.unlock();//释放锁
     }
 }
举报

相关推荐

0 条评论