文章目录
(一)线程池背景
当程序需要执行大量程序任务时,需要使用多线程来异步处理提升性能时,通常手段时new一个线程来开启执行,会导致程序不断的创建线程和销毁,不利于管理和监控,同时也会导致内核性能急剧下降,线程池由此出现,为了解决内核带来的损耗,所有线程任务统一由线程池来管理,同时也复用了线程的线程,避免了线程的不断创建和销毁,也可以更加方便管理线程。
(二)线程池优势
(三)线程池场景
(四)线程池源码分析
(1)核心成员变量
/**
- 线程池核心变量:
- 线程池主要是通过ctl开实现线程池状态和线程数量
*/
//ctl核心变量,是线程池状态(高三位)和线程数量(低29位)结合体
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程数总位数(二进制位):29
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程数最大值:00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
- 线程池状态含义:
- Running:接受新任务且处理阻塞队列中的任务
- ShutDown:拒绝新任务且处理阻塞队列中的任务
- Stop:拒绝新任务且不处理阻塞队列任务,同时中断正在处理的线程任务;
- Tidying:所有任务执行完,包含阻塞队列中的任务时,活跃线程数为0时,即将调用terminated方法的状态;
- Terminated:终止状态,terminated方法执行完毕后的状态。
*/
//线程池状态运行:11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//线程池状态拒绝状态:00000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//线程池抛弃和拒绝状态:00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//线程池任务执行完任务状态:01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//线程池终止状态:01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
/**
- 线程池辅助方法:
- 主要是为ctl变量服务的方法,获取线程状态、工作线程数和计算新的ctl值
*/
//获取线程池的状态,实际上高三位起作用
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取工作线程数量,实际上低29位起作用
private static int workerCountOf(int c) { return c & CAPACITY; }
//计算新的ctl值即根据当前线程池状态和新的工作线程数量计算
private static int ctlOf(int rs, int wc) { return rs | wc; }
(2)线程池状态转换
Running->Shutdown:显示调用shutdown方法或隐式调用finalize方法(内部包含调用shutdown方法);
Running/Shutdown->Stop:显示调用shutdownNow方法;
Shutdown->Tidying:当线程池和阻塞队列中都为空时;
Stop->Tidying:线程池为空时(Stop本身会自动清空阻塞队列);
Tidying->Terminated:当terminated方法中hook方法执行完时;
(3)线程池七大参数
corePoolSize:核心线程数;
maximunPoolSize:最大线程数;
keeyAliveTime:存活时间;
TimeUnit,存活时间的时间单位;
workQueue:任务队列,如有界阻塞数组队列ArrayBlockingQueue,无界阻塞链表队列LinkedBlockingQueue,
最多1个任务队列SynchronousQueue,优先级队列PriorityBlockingQueue等;
ThreadFactory:创建线程的工厂;
RejectedExecutionHandler:拒绝策略,当任务队列满且最大线程数达到最大值后采取的策略,
如AbortPolicy(抛异常)、CallerRunsPolicy(返回给调用者线程,让调用者执行)、
DiscardOldestPolicy(丢弃最久等待的任务来再次提交新任务到队列中)和DiscardPolicy(直接丢弃且不抛异常)。
说明:最大线程数触发条件,队列满了,线程池就会触发线程创建,用于处理任务;
(4)三大固定方法(Executors工具类不推荐使用)
1)newFixedThreadPool:创建一个核心线程个数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE,keeyAliveTime=0 说明只要线程个数比核心线程个数多并且当前空闲则回收——适用于长期、比较稳定性的任务
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//扩展newFixedThreadPool方法比上述方法增加一个指定的创建线程工厂
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
}
2)newSingleThreadExecutor创建一个核心线程个数和最大线程个数都为1的线程池,并且阻塞队列长度为Integer.MAX_VALUE,keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收——适用于单个任务串行任务执行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// 扩展newSingleThreadExecutor方法比上述方法增加一个指定的创建线程工厂
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),threadFactory));
}
3)newCachedThreadPool创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为
Integer.MAX_VALUE,并且阻塞队列为同步队列,keeyAliveTime=60说明只要当前线程60s内空闲则回收。这个特殊在于加入到同步队列的任务会被马上被执行,同步队列里面最多只有一个任务——适用于任务多且每个任务执行短的场景
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//扩展newCachedThreadPool方法比上述方法增加一个指定的创建线程工厂
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory);
}
(5)四大拒绝策略
Jdk内置的四种拒绝策略均实现了RejectExecutionHandler接口中的rejectedExecution方法
[void rejectedExecution(Runnable r, ThreadPoolExecutor executor);]
(6)execute方法(核心)
/*
* 如果线程数小于核心线程数,则创建新线程来执行任务
* 如果线程线程是Running状态且入队成功,再次检查线程池状态是否需要删除任务或增加线程数
* 如果线程入队失败或创建线程失败,执行拒绝策略
*/
public void execute(Runnable command) {
if (command == null)//空指针异常判断
throw new NullPointerException();
int c = ctl.get();//ctl是一个原子类型变量,获取线程池ctl变量
if (workerCountOf(c) < corePoolSize) {//利用ctl进行低29位值与核心线程数对比
if (addWorker(command, true))//创建工作线程(核心中的核心)
return;
c = ctl.get();//再次获取最新值,以防创建线程过程发生变化
}
//线程池处于Running状态时任务添加到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();//重新获取ctl值
if (!isRunning(recheck) && remove(command))//再次检查,线程池状态是否发生了变化
reject(command);//拒绝策略
else if (workerCountOf(recheck) == 0)//判定是否有空闲线程
addWorker(null, false);//增加新线程
}
//不是接受新任务的状态或入队失败后尝试创建新线程(是否超过最大线程数)
else if (!addWorker(command, false))
reject(command);//拒绝策略(实际上代表已经达到最大线程数)
}
(7)addWorker方法(核心中的核心)
/**
* addWorker主要分为两部分
* 第一部分:使用双层自旋对线程池ctl变量进行工作线程+1
* 第二部分:并发安全的将任务添加到工作线程并启动
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {//自旋
int c = ctl.get();//获取ctl值
int rs = runStateOf(c);//获取线程池状态
/**
* 做出对线程池状态不同返回false的处理:
* (1)线程池状态为Stop、Tidying和Terminated,返回false
* (2)线程池状态为Shutdown并且firstTask任务存在,返回false
* (3)线程池的任务队列是空的即检查队列在必要时是否为空,返回false
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
/**
* 再次内自旋保证创建新线程
* (1)低于核心线程数时
* (2)大于等于核心线程数且小于最大线程数时
*/
for (;;) {
int wc = workerCountOf(c);//获取低29位线程数
/**
* 工作线程是否超过程序允许的范围
* core值决定阈值是否超过核心线程数或最大线程数
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子类自增工作线程,保证同一时刻只能有一个线程自增成功
if (compareAndIncrementWorkerCount(c))
break retry;//cas成功跳出外层自旋
c = ctl.get(); //重新获取ctl
if (runStateOf(c) != rs)//检查线程池状态是否发生变化
continue retry;
//如果代码执行到这里,则说明内层自旋中cas失败,重新自增
}
}
//代码执行到这里说明ctl中已经成功CAS,让工作线程增加1
/**
* 上面一部分只是将工作线程增加+1,然后重新设置到ctl中
* 此部分是真正产生新线程和从队列中取任务
*/
boolean workerStarted = false;//工作线程开始标志默认false
boolean workerAdded = false;//工作线程增加标志默认false
Worker w = null;
try {
w = new Worker(firstTask);//创建工作线程,根据提交的线程任务
final Thread t = w.thread;//获取任务线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;//使用全局锁
mainLock.lock();//获取锁执行权
try {
int rs = runStateOf(ctl.get());//获取线程池状态
/**
* 主要是判断线程池是否接受新任务
* (1)线程池状态是否为接受任务状态Running
* (2)线程池状态为Shutdown且该任务是否为空
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) //判断是否已经被激活
throw new IllegalThreadStateException();
workers.add(w);//新的工作线程添加到HashSet中
int s = workers.size();
if (s > largestPoolSize)//不允许超过线程池最大值
largestPoolSize = s;
workerAdded = true;//工作线程添加成功
}
} finally {
mainLock.unlock();//释放全局锁
}
if (workerAdded) {//工作线程增加成功
t.start();//开始调度
workerStarted = true;//标志更改开始执行
}
}
} finally {
if (! workerStarted)//开始start线程失败
addWorkerFailed(w);//增加工作线程失败
}
return workerStarted;
}
(8)工作线程Worker内部方法(runWorker方法)
构造方法:Worker的状态为-1,是为了避免当前worker在调用runWorker方法前被中断(当其它线程调用了线程池的shutdownNow时候,如果worker状态>= 0则会中断该线程)。这里设置了线程的状态为-1,所以该线程就不会被中断了。run方法中运行runWorker的代码时候会调用unlock方法,该方法把status变为了0,所以这时候调用shutdownNow会中断worker线程了。
Worker(Runnable firstTask) {
setState(-1); //设置AQS中state状态值为-1,保证其工作线程在runWorker方法被调用之前禁止被中断
this.firstTask = firstTask;//工作任务
this.thread = getThreadFactory().newThread(this);//创建线程
}
执行工作线程run,因Worker实现了Runnable接口,主要是为了调用runWorker方法
public void run() {
runWorker(this);
}
/**
* 工作线程执行真正的run方法,包含了对工作线程的管理、线程池的状态、新线程(核心线程)的开启
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();//获取当前线程
Runnable task = w.firstTask;//Runnable 的实例即任务
w.firstTask = null;
w.unlock(); //此处与Worker构造方法中设置-1有关,这里是允许被中断
boolean completedAbruptly = true;//中断标记
try {
//执行任务为null或从队列中任务为null
while (task != null || (task = getTask()) != null) {
w.lock();//获取锁
/**
* 是否对工作线程进行中断条件如下:主要是对shutdown或者shutdownNow操作控制
* (1)当前线程池状态是大于等于Stop状态,如果是则中断线程
* (2)在(1)的检查下是小于Stop状态,则对当前线程是否有中断且再次检查线程池状态是否大于等于Stop
* 同时也要检查工作线程是否有无中断标记,满足这三个条件下则也进行中断
*/
if ((runStateAtLeast(ctl.get(), STOP ) ||
(Thread.interrupted()&&unStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())//检查工作线程是否中断条件
wt.interrupt();//中断
try {
beforeExecute(wt, task);//该方法为工作线程和执行任务前置方法,目前是空,子类可重写
Throwable thrown = null;
try {
task.run();//调度执行任务方法
} catch (RuntimeException x) {//运行时异常
thrown = x; throw x;
} catch (Error x) {//程序出错
thrown = x; throw x;
} catch (Throwable x) {//JVM层次异常
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//该方法为工作线程和执行任务后置方法,目前是空,子类可重写
}
} finally {
task = null;//help GC
w.completedTasks++;//统计当前工作线程完成任务数量
w.unlock();//释放w自身获得的锁执行权,Worker继承了AQS,所以是可以直接使用lock和unlock
}
}
completedAbruptly = false;
} finally {
//执行工作线程之完毕的清理工作
processWorkerExit(w, completedAbruptly);
}
}
(9)processWorkerExit清理方法
/**
* processWorkerExit作用如下:
* (1)统计线程池完成任务数量
* (2)线程状态转换且
* (3)是否开启新线程
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) //是否被中断,如果中断则要减少工作线程的数量
decrementWorkerCount();//CAS操作减少ctl的工作线程数量
//统计线程池完成任务的数量
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//全局锁,只允许同一时刻一个线程执行统计任务
try {
completedTaskCount += w.completedTasks;//完成任务数量统计
workers.remove(w);//HashSet中去处当前工作线程
} finally {
mainLock.unlock();//释放全局锁
}
/**
* 尝试对线程池的状态转换即向Terminated转变
* (1)如果当前是shutdonw状态并且工作队列为空
* (2)当前是stop状态当前线程池里面没有活动线程
*/
tryTerminate();
int c = ctl.get();//重新获取ctl值
if (runStateLessThan(c, STOP)) {//检查当前线程池状态是否小于Stop状态即Running
if (!completedAbruptly) {//当前工作线程是否被中断
//线程池实例对象调用allowCoreThreadTimeOut(true);//可以设置人为设置
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())//该条件时根据认为设置来确定大小,默认是0
min = 1;
if (workerCountOf(c) >= min)//可能是默认0、1或核心线程数corePoolSize
return; //不需要增加线程且还有任务正在执行,当前工作线程数已达核心线程数
}
addWorker(null, false);//线程不足(未达到核心线程数)可以再次新开启工作线程
}
}
(10)tryTerminate状态转换
/**
* 尝试对线程池的状态转换即向Terminated转变
* (1)如果当前是shutdonw状态并且工作队列为空
* (2)当前是stop状态当前线程池里面没有活动线程
*/
final void tryTerminate() {
for (;;) {//自旋
int c = ctl.get();//获取ctl值
/**
*转换操作退出条件如下:
* (1)线程池状态是Running状态退出向Terminated转变
* (2)线程池状态是小于Tidying状态不可转变
* (3)线程池状态时shutdown且任务队列不为空时不可转变
* 总结:没有任务,也就是空闲状态下可转变
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//执行到这里代表线程池状态为shutdown且队列任务为null或Stop状态
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);//中断工作线程
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//允许一个线程执行状态转变
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//CAS设置ctl状态为0
try {
terminated();//真正转变terminated状态,默认为空,子类可重写自定义逻辑
} finally {
ctl.set(ctlOf(TERMINATED, 0));//ctl值设置为0主要防止子类重写改变
//激活调用条件变量termination的await系列方法被阻塞的所有线程
termination.signalAll();//唤醒所有条件限制阻塞的线程
}
return;
}
} finally {
mainLock.unlock();//释放锁
}
//代码执行到此处代表需要自旋
}
}
(11)shutdown方法
/**
* 设置线程池状态为shutdown后,拒绝新任务提交,仍然继续处理阻塞队列中的任务
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//主锁
try {
checkShutdownAccess();//检查是否有权限shutdown
advanceRunState(SHUTDOWN);//将线程池状态设置为shutdown
interruptIdleWorkers();//中断标志即中断空闲线程,有worker但是不是正在执行的线程
onShutdown(); //子类重写或hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();//释放锁
}
tryTerminate();//状态转变
}
(12)shutdownNow方法
/**
* shutdownNow设置线程池状态为Stop后,拒绝新任务提交且中断当前工作线程
* 返回阻塞队列中的所有等待任务
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//检查权限
advanceRunState(STOP);//设置线程池状态
interruptWorkers();//中断当前真正执行的所有工作线程
tasks = drainQueue();//返回阻塞队列中的任务
} finally {
mainLock.unlock();
}
tryTerminate();//状态转变
return tasks;
}
(13)awaitTermination方法
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);//超时纳秒值
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//独占锁
try {
for (;;) {自旋
if (runStateAtLeast(ctl.get(), TERMINATED))//直到线程池状态改变为terminated
return true;
if (nanos <= 0)//超时则返回
return false;
nanos = termination.awaitNanos(nanos);//不停的计算超时差
}
} finally {
mainLock.unlock();//释放锁
}
}