前言
在实际开发中很多人用过线程池但是并不一定懂线程池。有如下几个问题:
- 创建线程比较耗费资源体现在哪儿?
- 线程怎么保活?
- 一个线程是怎么进行Runnable一个又一个的执行的?
- 对于线程池构造参数你能理解它们的意义吗?当一个个任务运行时,你可否知道它们去哪儿,它们什么时候会执行,会结束?它们发生异常之后是怎样的?在设置数量的时候设置多大是合适的?设置时间的时候设置多长?一个线程池,core=2,max=4,queue=1,请问最多几个线程就触发拒绝策略?
- 为什么使用线程池?redis之前是单线程的,也很快。
如果你能回答不需要再往下看。
这里我们学习除了问题1、问题5的其它问题。
自己设计线程池
在学习线程池之前,如果让你设计手写一个线程池,你会怎么写?
要求:
1)有核心线程,任务执行结束后存活。
2)有最大线程数,可以接收多少个线程,并实现相应的拒绝策略。
大概是这样:
public class MyPool implements MyExecutor {
private AtomicInteger count = new AtomicInteger(0);
private AtomicInteger core;
private AtomicInteger max;
private BlockingQueue<Runnable> workQueue;
private ThreadFactory threadFactory;
private MyRejectedExecutionHandler handler;
private static class AbortPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, MyPool executor) {
throw new RejectedExecutionException("拒绝线程");
}
}
public MyPool(int core, int max, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
if (core < 0 || max < 0 || max < core) throw new IllegalArgumentException();
this.core = new AtomicInteger(core);
this.max = new AtomicInteger(max);
this.workQueue = workQueue;
this.threadFactory = threadFactory;
this.handler = new AbortPolicy();
}
public ThreadFactory getThreadFactory() {
return threadFactory;
}
@Override
public void execute(Runnable command) {
int c = count.get();
if (c < core.get()) {
count.incrementAndGet();
Thread thread = getThreadFactory().newThread(command);
thread.start();
} else if (c < max.get()) {
count.incrementAndGet();
Thread thread = getThreadFactory().newThread(command);
thread.start();
} else {
handler.rejectedExecution(command,this);
throw new RejectedExecutionException("线程池已满");
}
}
}
遇到的问题 ,参数都没用完,线程不保活,线程池没状态,没生命周期,还有并发问题。
ThreadPoolExecutor
首先来看diagram。

ThreadPoolExecutor的关系是简单的,在AbstractExecutorService是抽象类,主要实现了ExecutorService的功能,这里只看Executor接口及其实现ThreadPoolExecutor。
Executor接口
public interface Executor {
void execute(Runnable command);
}
executor是比较重要的接口,核心接口。
ThreadPoolExecutor实现
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
猛一看云里雾里的,逻辑和我们想的不一样。
第一行是个校验,比较简单。第二行的ctl是什么?中间的addWorker是什么?
线程池控制参数ctl
ctl源码是
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl是Main pool control state,线程池控制的核心,ctl的值前3位表示线程池的状态,后29位表示线程池的线程数量。通过一个值就完成了状态和数量的整合,简直完美。
运行状态rs有
| 状态 | 高位比特值 | 数字 |
|---|---|---|
| RUNNING | 111 | -1 |
| SHUTDOWN | 000 | 0 |
| STOP | 001 | 1 |
| TIDYING | 010 | 2 |
| TERMINATED | 011 | 3 |
| 简单总结就是运行态为负数和大于等于0的其它状态。 |
ctlOf方法
private static int ctlOf(int rs, int wc) { return rs | wc; }
ctlOf方法就是通过rs(运行状态)和wc(线程数量)计算出ctl的值。
workerCountOf方法
private static int workerCountOf(int c) { return c & CAPACITY; }
workerCountOf计算worker的数量
runStateOf方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
runStateOf计算线程池的状态
Worker
/**
*Worker是线程池的一个工作单元,继承AQS就保证了一个worker执行时,其它线程不会进来,一个worker本身又是
*一个Runnable,用于创建Thread时候传入worker。
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
//线程池中运行的Thread都是此处的thread,在构造worker的时候就会生成
//当thread.start的时候,会运行此worker中的run方法,能执行多个Runnable,重用一个thread的秘诀就在这里面
final Thread thread;
/** Initial task to run. Possibly null. */
//进来的第一个Runnable任务就会放入这里
//很多人不明白为啥是第一个,worker后面的执行的Runnable都是从workQueue中取的,故此处只是第一个
Runnable firstTask;
/** Per-thread task counter */
//记录了完成的任务数
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
//runWorker(this)重用线程,不再启动Thread的精髓
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
runWorker(Worker w)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//unlock是state初始时是-1
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//精髓在这里
//task为null的时候,任务结束,线程结束。
//task不为null好理解,getTak是从workQueue取,2种,一种是带超时的poll(时间就是初始化的keepAliveTime),一种是阻塞take,这个getTask就保证了线程的存活,任务的执行。
while (task != null || (task = getTask()) != null) {
//worker加锁,进行状态校验
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//空方法,子类可以实现,你可以自己继承这个类,重写这个方法干事
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任务的执行,每个任务只是跑run方法,不是启动生成线程
task.run();
} catch (RuntimeException x) {
//...省略代码
} finally {
//空方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
搞明白ctl和worker后,再看execute方法。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
这段的逻辑:当线程数少于核心线程数的时候,生成worker并运行,over。放进workQueue,成功返回over,否则添加worker到max下,这个worker运行结束后就消失了。
线程池中的参数含义
现在咱们来解释线程池中各个参数的含义
/**
*@param corePoolSize 核心线程数,保活的线程,当设置allowCoreThreadTimeOut后保活一定一定时间
*@param maximumPoolSize 最大线程数 此线程池允许的最大线程数
*@param keepAlivetime 当线程数量大于核心数量,过多的空闲线程在终止前会等待新任务的最大时间
*@param unit 和keepAlivetTime使用
*@param workQueue 在任务被执行前持有runnable的队列,只会持有execute提交的Runnable
*@param threadFactory 线程工厂
*@param handler 拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
有两个问题:
1.corePoolSize的保活时间是多少
2.keepAliveTime的时间是空闲线程的存活时间 ,代码体现在哪儿?
答:
1.corePoolSize的保活时间是当allowCoreThreadTimeOut设置为true时保活keepAliveTime,为false时是永远。
allowCoreThreadTimeOut默认是false,但是可以设置。在ThreadPoolExecutor中提供了一个公共方法来设置这个值。
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
2.代码
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
至此,线程池的参数全部清楚了。
corePoolSize:核心线程数,当allowCoreThreadTimeOut为true是,核心线程保活一定时间(kepAliveTime),为false时保活永远。
maximumPoolSize:最大线程数,线程池所允许的最大线程数
workQueue:任务队列,当线程数小于核心线程数时就会把任务放入此队列。
在此有三个数core,max,queue,它们三者的关系是什么呢?
线程数<core==>核心线程,
否则 workQueue,
否则 max。
源码
//小于核心线程数,进核心线程
if (workerCountOf(c) < corePoolSize) {
...
}
//核心线程满了,进workQueue
if (isRunning(c) && workQueue.offer(command)) {
...
}
//workQueue满了,直接生成线程,到max了。
else if (!addWorker(command, false))
reject(command);
线程池的生命周期
线程池的生命周期和线程池的状态息息相关。
线程池的状态的变化。

-1–>0–>2–>3
或
-1–>1–>2–>3
线程的状态不会倒回去,也就是说运行状态变成关闭状态的时候,不会再变回运行态。
总结
学习线程池我们可以学习到什么?
- 状态和数量维护到一个值中可以避免两个变量带来的问题。
- 线程保活只是在一个线程线程里面进行Runnable任务的循环执行
- 使用位运算










