0
点赞
收藏
分享

微信扫一扫

ThreadPoolExecutor线程池原理分析

向上的萝卜白菜 2021-09-28 阅读 132

ThreadPoolExecutor 我们在开发过程中经常用到,它的主要作用就是提前创建好若干个线程放在一个容器中。如果有任务需要处理,则将任务直接分配给线程池中的线程来执行就行,任务处理完以后这个线程不会被销毁,而是等待后续分配任务。

那它是如何实现线程复用的?今天我们就回答这个问题。

一、ThreadPoolExecutor的基本用法

1.1、TheadPooolExecutor的构造函数

public ThreadPoolExecutor(int corePoolSize,         //核心线程数量
int maximumPoolSize, //最大线程数
long keepAliveTime, //超时时间,超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //存活时间单位
BlockingQueue<Runnable> workQueue, //保存执行任务的队列
ThreadFactory threadFactory,//创建新线程使用的工厂
RejectedExecutionHandler handler //当任务无法执行的时候的处理方式)
  • corePoolSize 代表缓存的核心线程个数
  • maximumPoolSize 代表线程池最大的线程个数
  • keepAliveTime 代表超时销毁时间,针对非核心线程,空闲若干时间就会被销毁。
  • unit keepAliveTime的时间单位
  • workQueue 尚未执行的任务(Runnable)队列。
  • threadFactory 创建Thread类的工厂
  • handler 任务无法执行时的处理操作

创建线程池

  var threadPool = ThreadPoolExecutor(1,2,60,TimeUnit.SECONDS,LinkedBlockingDeque<Runnable>())

执行线程任务

  threadPool.execute(Runnable { 

})

ThreadPoolExecutor的主要逻辑是:

  • 线程数少于核心线程数,新建线程执行任务
  • 线程数等于核心线程数后,将任务加入阻塞队列
  • 如果队列容量非常大,可以一直添加;如果队列容量有限,队列满了之后,则尝试创建一个非核心线程执行任务。
  • 执行完成任务的线程 反复去任务队列任务来执行。
  • 任务队列为空时,核心线程会阻塞(block),直到有新的任务。

1.2、常用线程池

newFixedThreadPool

newFixedThreadPool 的核心线程数和最大线程数都是指定值,当线程池中的线程数超过核心线程数后,任务都会被放到阻塞队列中。这里选用的阻塞队列是LinkedBlockingQueue,使用的是默认容量 Integer.MAX_VALUE,相当于没有上限。

public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
}

用途:FixedThreadPool 用于负载比较大的服务器,为了资源的合理利用,需要限制当前线程数量。
缺点:任务队列没有上限,一直追加可能造成OOM

newCachedThreadPool

newCachedThreadPool 创建一个可缓存线程池。核心线程数为0,最大线程数为Integer.MAX_VALUE。如果线程池长度超过处理需要,可灵活回收空闲线程。
接收到新任务将被立即执行:若有空闲线程,则放到空闲线程执行;若无空闲线程,则创建新的非核心线程来执行。
创建的线程空闲60s 则会被回收。

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

缺点:线程可以无限创建,当接收大量任务时 可能创建大量的线程,给JVM过大的负担。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行

二、线程池实现原理

ThreadPoolExecutor的核心属性和方法

public class ThreadPoolExecutor extends AbstractExecutorService {

//线程池任务队列
private final BlockingQueue<Runnable> workQueue;
//线程池工作线程
private final HashSet<Worker> workers = new HashSet<>();

//线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c }
private static int workerCountOf(int c) { return c }
private static int ctlOf(int rs, int wc) { return rs | wc; }

public void execute(Runnable command) {

}

public void shutdown() {}

public List<Runnable> shutdownNow() {}
}

2.1、workQueue;

workQueue 是线程池的任务队列,里面缓存待执行的Runnable任务

2.2、workers

workers 是当前正在执行的工作线程集合。

2.3、ctl 线程池状态

ctl是一个原子类,主要作用是用来保存线程数量和线程池的状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

一个 int 数值是 32 个 bit 位,这里采用高 3 位来保存运行状态,低 29 位来保存线程数量。

private static int ctlOf(int rs, int wc) { return rs | wc; }
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //将 1 的二进制向右位移 29 位,再减 1 表示最大线程容量
//运行状态保存在 int 值的高 3 位 (所有数值左移 29 位)
private static final int RUNNING = -1 << COUNT_BITS;// 接收新任务,并执行队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任务,但是执行队列中的任务
private static final int STOP = 1 << COUNT_BITS;// 不接收新任务,不执行队列中的任务,中断正在执行中的任务
private static final int TIDYING = 2 << COUNT_BITS; //所有的任务都已结束,线程数量为 0,处于该状态的线程池即将调用 terminated()方法
private static final int TERMINATED = 3 << COUNT_BITS;// terminated()方法执行完成
  • RUNNING 线程池处于运行状态:可以接收新的任务,可以执行任务队列中的任务。
  • SHUTDOWN 线程池处于关闭状态:不接收新的任务,但是任务队列中的任务会继续执行。
  • STOP 线程池处处于停止状态:不接收新的任务,不执行任务队列中的任务,中断正在运行的任务。
  • TIDYING 所有的任务都已经结束,线程数为0。处于此状态的线程池,即将调用terminated()方法。
  • TERMINATED terminate方法执行完成。

2.4、shundown() 和showdonwNow()

  • 执行 shutdown() 线程池会进入SHUTDOWN状态:不接收新的任务,但是任务队列中的任务会继续执行。
  • 执行 shutdownNow() 线程池会进入STOP状态:不接收新的任务,不执行任务队列中的任务,中断正在运行的任务。

2.5、execute() 执行Runnable任务

 public void execute(Runnable command) {


int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //1、工作线程数 小于核心线程个数,则尝试直接创建一个新的核心线程来执行任务。

if (addWorker(command, true))
return;
c = ctl.get();
}

if (isRunning(c) && workQueue.offer(command)) { //2、核心线程数已满,任务队列未满,则将任务添加到任务队列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //线程状态二次检查,如果当前线程池关闭了,则移除该任务
reject(command);
else if (workerCountOf(recheck) == 0) //线程状态二次检查,如果当前工作线程数为0,则新建非核心线程来执行剩余任务
addWorker(null, false);
}
else if (!addWorker(command, false)) //3、任务队列已慢,则尝试创建一个新的非核心线程 处理该任务。
reject(command);
}

2.5.1、addWorker

如果工作线程数小于核心线程数的话,会调用 addWorker,创建一个工作线程。
大致做了两件事情:

1、状态检查,不允许创建线程的情况,直接返回false; 条件符合,则工作线程计数+1
  • (1)如果线程处于非运行状态,不允许新创建线程,直接返回false
  • (2)如果工作线程数大于默认容量大小或者大于核心线程数大小,则直接返回 false 表示不能再添加 worker,返回false
  • (3)通过 cas工作线程数加1
2、创建Worker实例,添加到workers集合,并开启线程
  • (4)将新创建的 Worker 添加到 workers 集合中
  • (5)如果 worker 添加成功,启动线程
 private boolean addWorker(Runnable firstTask, boolean core) {
retry: //goto 语句,避免死循环

//状态检查,不允许创建线程的情况,直接返回false; workerCount计数+1。
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//(1)如果线程处于非运行状态,不允许新创建线程,直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) { //自旋
int wc = workerCountOf(c);//获得 Worker 工作线程数
//(2)如果工作线程数大于默认容量大小或者大于核心线程数大小,则直接返回 false 表示不能再添加 worker,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//(3)通过 cas工作线程数加1
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) //这里如果不相等,说明线程的状态发生了变化,继续重试
continue retry;

}
}

//创建Worker实例,添加到workers集合,并开启线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //构建一个 Worker,传入了一个 Runnable 对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {

int rs = runStateOf(ctl.get());
//将新创建的Workder 添加到workers工作线程集合当中。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {

workers.add(w); //(4)将新创建的 Worker 添加到 workers 集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//(5)如果 worker 添加成功,启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); //如果添加失败,就需要做一件事,就是递减实际工作线程数(还记得我们最开始的时候增加了工作线程数吗)
}
return workerStarted;//返回结果
}
Worker 是什么?

Worker代表一个工作线程,内部持有一个thread和firstTask。

  • firstTask 是线程初始化时要被首先执行的任务
  • thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,是用来处理任务的线程。
  • 线程具体的执行操作代理到了外部的runWorker()方法中。
 private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}

}



runWorker 方法

runWorker 是工作线程 执行任务的主要场所。

  • 如果 task 不为空,则开始执行 task
  • 如果 task 为空,则通过 getTask()再去取任务,并赋值给 task,如果取到的 Runnable 不为空,则执行该任务
  • 执行完毕后,通过 while 循环继续 getTask()取任务
  • 如果 getTask()取到的任务依然是空,那么整个 runWorker()方法执行完毕
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;

try {
while (task != null || (task = getTask()) != null) {
w.lock();

...
try {
beforeExecute(wt, task);//任务执行前回调
Throwable thrown = null;
try {
task.run(); //执行具体的任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//任务执行后回调
}
} finally {
task = null; //task设置成null,保证下次进入while循环执行getTask()
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);//线程执行完成,将当前worker 从wokers集合移除
}
}

整体看就是一个无限循环:

从队列提取任务->执行任务->队列提取任务->执行任务

当getTask() 返回空任务时,循环结束,该工作线程也就结束了。

getTask

线程池之所以可以复用线程,关键点就在getTask。

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {//自旋
int c = ctl.get();
int rs = runStateOf(c);

//(1)对线程池状态的判断:
//线程池处于SHUTDOWN状态 且任务队列为空, 或者 线程池处于STOP状态,应该销毁当前线程,此时返回null。

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;//返回 null,则当前 worker 线程会退出
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// timed 变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制


//(2) 工作队列中有未执行的任务 且工作线程数量超过了maximumPoolSize
//或者上一次取任务已经超时,则 返回null,当前线程退出。

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*根据 timed 来判断,如果为 true,则通过阻塞队列 poll 方法进行超时控制,如果在
keepaliveTime 时间内没有获取到任务,则返回 null.
否则通过 take 方法阻塞式获取队列中的任务*/


//(3) timed = true,表示需要进行超时控制,采用workQueue.poll() 从队列中取任务,会阻塞keepAliveTime时间,超时后 timedOut = true,下次循环会直接返回null。 - 线程被回收。
timed = false,表示不需要进行超时控制,采用workQueue.take(),该线程会一直阻塞,直到有新的任务。 - 线程不被回收
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)//如果拿到的任务不为空,则直接返回给 worker 进行处理
return r;
timedOut = true;//如果 r==null,说明已经超时了,设置 timedOut=true,在下次自旋的时候进行回收
} catch (InterruptedException retry) {
timedOut = false;// 如果获取任务时当前线程发生了中断,则设置 timedOut 为false 并返回循环重试
}
}
}



关键代码如下;

  boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();

默认 allowCoreThreadTimeOut为false

  • 对于核心线程timed =false,workQueue.take() 取下一个任务,workQueue是LinkedBlockQueue,没有新任务时会一直阻塞在这里(线程不会被释放),当用新任务添加进来,take()返回新任务,线程继续执行。
  • 对于非核心线程(wc > corePoolSize),采用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方式提取新任务,当workQueue为空时,会等待keepAliveTime,时间超时后,返回null。当前线程就会被释放。

至此 ThreadPoolExecutor 可以服用线程的原理也就清除了。

processWorkerExit

runWorker 的 while 循环执行完毕以后,在 finally 中会调用 processWorkerExit,将当前线程从worker中移除,并执行tryTerminate()

private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

2.6、拒绝策略

当工作线程数超过corePoolSize,且工作队列已满,且线程总数也达到了maximumPoolSize(非核心线程也无法创建),此时就会拒绝新任务:

final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

三、其他

3.1、如何取消任务

ThreadPoolExecutor.execute() 提交一个任务是无法取消该任务的。

那提交了一个任务之后,想取消执行该任务 有没有途径呢?

答案是submit()方法

   public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

submit方法返回一个Future对象

public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Future.cancel(true) 可以取消Runnable任务。

  • 任务尚未执行,调用cancel()后,该任务就永远不会被执行
  • 任务正在执行,mayInterruptIfRunning=true时,会interrupted Thread.

测试代码


fun testThreadPool(){
var threadPool = ThreadPoolExecutor(1,5,60,
TimeUnit.SECONDS,
LinkedBlockingDeque<Runnable>()
);

var fu = threadPool.submit(MyRunaable())

Thread.sleep(10000)
fu.cancel(true)
Log.d("feifei","fu cancel")
}

class MyRunaable():Runnable{
var index = 0;
override fun run() {

while(true){
Thread.sleep(1000)
Log.d("feifei","MyRunaable run():${index++}")
}
}
}

输出结果:

2021-01-12 15:12:29.925 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():0
2021-01-12 15:12:30.925 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():1
2021-01-12 15:12:31.926 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():2
2021-01-12 15:12:32.926 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():3
2021-01-12 15:12:33.927 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():4
2021-01-12 15:12:34.928 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():5
2021-01-12 15:12:35.928 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():6
2021-01-12 15:12:36.929 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():7
2021-01-12 15:12:37.929 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():8
2021-01-12 15:12:38.925 6847-6847/com.sogou.iot.myapplication D/feifei: fu cancel

3.2、线程池的监控

如果在项目中大规模的使用了线程池,那么必须要有一套监控体系,来指导当前线程池的状态,当出现问题的时候可以快速定位到问题。而线程池提供了相应的扩展方法,我们通过重写线程池的 beforeExecute、afterExecute 和 shutdown 等方式就可以实现对线程的监控,简单给大家演示一个案例

3.3、使用线程池注意事项

3.3.1、 使用ThreadPoolExecutor而非Executors创建线程池

用 Executors创建线程池 使得用户不需要关心线程池的参数配置,意味着大家对于线程池的运行规则也会慢慢的忽略。这会导致一个问题。
比如我们用 newFixdThreadPool 或者 singleThreadPool.允许的队列长度为Integer.MAX_VALUE,如果使用不当会导致大量请求堆积到队列中导致 OOM 的风险而 newCachedThreadPool,允许创建线程数量为 Integer.MAX_VALUE,也可能会导致大量线程的创建出现 CPU 使用过高或者 OOM 的问题。

3.3.2、合理设置线程池大小。

需要分析线程池执行的任务的特性: CPU 密集型还是 IO 密集型

  • 如果是 CPU 密集型:主要是执行计算任务,响应时间很快,cpu 一直在运行,这种任务 cpu的利用率很高。CPU 核心数=最大同时执行线程数,加入 CPU 核心数为 4,那么服务器最多能同时执行 4 个线程。过多的线程会导致上下文切换反而使得效率降低。
  • 如果是 IO 密集型:主要是进行 IO 操作,执行 IO 操作的时间较长,这是 cpu 出于空闲状态,导致 cpu 的利用率不高,这种情况下可以增加线程池的大小。一般可以配置 cpu 核心数的 2 倍。

四、参考文章

https://www.jianshu.com/p/a977ab6704d7

https://www.jianshu.com/p/7b2da1d94b42

举报

相关推荐

0 条评论