0
点赞
收藏
分享

微信扫一扫

Java并发编程之线程池


为什么需要线程池?

  1. 反复创建线程开销大
  2. 过多的线程会占用太多内存
  3. 管理线程,提高效率

我们知道线程是一种比较昂贵的资源,我们通过程序每创建一个线程去执行,其实操作系统都会对应地创建一个线程去执行我们的任务,当并发数不大时,对系统似乎没什么影响,但当并发数很大时,我们为每一个请求都去创建一个线程,然后等待被调度、执行完任务后再销毁,这样频繁的创建、销毁线程是很耗费系统资源的。

Java并发编程之线程池_创建线程


使用线程池去管理线程,可以很好的减少重复创建销毁的损耗,因为线程池会复用线程,什么是复用线程呢?就是线程池里面的线程,并不和我们自己创建一个线程去执行单个任务一样,执行完这个任务线程就结束了,而线程池中的线程,它的执行逻辑中有一个while循环,在这个while循环中,线程会不断的去获取任务,然后执行(有任务的情况下),如果在高并发环境下,这会极大的减少线程的创建与销毁操作,节约系统的资源。

如下代码:

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

Repeatedly gets tasks from queue and executes them这是上面这个方法的一段注释,从方法的代码和注释中,都很明显的展现了线程池中的线程并不是执行完一个任务就结束了,而是会主动(重复)去工作队列获取任务,然后执行。

Java并发编程之线程池_创建线程_02


同时没有节制的创建线程,会消耗大量的内存资源,我们没必要来一个任务,就新建一个线程。就像坐车一样,坐不下也没必要新建太多的车辆,让他们等下一辆就好了(车辆与人数的权衡------线程数和任务数之间的权衡)。而各种各样的线程池能够帮助我们实现这一点,合理控制线程的数量,减少内存消耗。

总结来说:只使用合理数量的线程,并不会无限的创建线程,避免内存占用过多,合理利用内存资源;复用线程,使用相同的线程去执行不同的任务,避免重复创建销毁线程的开销。同时方便管理(线程池不仅仅只是为了线程复用,也可以方便管理,因为所有的线程都集中到了一块)

线程池ThreadPoolExecutor简介

线程池的整体设计,详细见图

Executor:顶层接口

ExecutorService: 扩展接口

AbstractExecutorService:抽象类,实现了主要方法供子类继承使用

ThreadPoolExecutor: 提供了线程池的功能实现,各种方法

Java并发编程之线程池_创建线程_03

主要参数:

Java并发编程之线程池_线程池_04


(下面代码的翻译为使用插件翻译,只能提供个大概的意思)

线程池在完成初始化后,并没有任何线程,接到任务的时候,才会去创建新线程执行任务

//核心线程数大小是保持活动状态的最小工作线程数(不允许超时等),除非设置了 allowCoreThreadTimeOut,在这种情况下,最小值为零。
private volatile int corePoolSize;

//最大线程数大小。 请注意,实际最大值受 CAPACITY 内部限制。
private volatile int maximumPoolSize;

//等待工作的空闲线程的超时时间(以纳秒为单位)。 当存在线程超过 corePoolSize 或允许 CoreThreadTimeOut 时,线程使用此超时时间,也就是说空闲时间超过了keepAliveTime,就会被终止。 否则他们永远等待新的工作。
private volatile long keepAliveTime;

/**
用于保存任务和移交给工作线程的队列。 我们不要求 workQueue.poll() 返回
null 必然意味着 workQueue.isEmpty(),所以只依赖 isEmpty 来查看队列
是否为空(例如,在决定是否从 SHUTDOWN 转换到 TIDYING 时我们必须这样做) .
这适用于特殊用途的队列,例如允许 poll() 返回 null 的 DelayQueues,即使
它稍后可能在延迟到期时返回非 null。
*/
private final BlockingQueue<Runnable> workQueue;

workQueue是BlockingQueue类型,如下图所示,这是一个接口,下面还有其他子接口和具体实现类,这里谨做一个了解

Java并发编程之线程池_高并发_05


新的线程由线程工厂threadFactory创建

Executors.defaultThreadFactory()是一个默认的线程工厂,他实现了ThreadFactory接口,是Executors的一个静态类,用他来创建线程,线程会拥有同样的优先级,并且不是守护线程。

如果自己指定其他的线程工厂,就可以修改线程名,线程组,优先级,设置守护线程等等。

static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

/**
新线程的工厂。 所有线程都是使用这个工厂创建的(通过方法 addWorker)。
所有调用者都必须为 addWorker 失败做好准备,这可能反映了系统或用户限制线程数的策略。
即使它不被视为错误,创建线程失败也可能导致新任务被拒绝或现有任务卡在队列中。 即使在尝
试创建线程时可能会抛出 OutOfMemoryError 等错误时,我们也保留池不变。
由于需要在 Thread.start 中分配本机堆栈,因此此类错误相当常见,并且用户将希望执行
clean pool shutdown 进行清理。 可能有足够的内存可供清理代码完成而不会遇到另一个
OutOfMemoryError
*/
private volatile ThreadFactory threadFactory;

/**
* 在执行中线程饱和或关闭时调用的处理程序。
*/
private volatile RejectedExecutionHandler handler;

添加线程规则

  1. 当有任务进来时,就会创建新线程,直到他达到corePoolSize,那怕其中有线程处于空闲状态
  2. 如果已有线程数大于等于corePoolSize,但小于maximumPoolSize,则将任务放入workQueue队列
  3. 如果workQueue队列已满,并且线程数少于maximumPoolSize,则创建一个新线程来运行任务
  4. 如果workQueue队列已满,并且线程数等于maximumPoolSize,则停止创建新线程

我们可以看到线程池希望保持较少的线程数,只有当任务很多的时候,才会增加新线程,这里就是任务堆满了,才会考虑创建新线程。就好像工作堆的太多了,你就会叫人帮忙,技术来源于生活。

Java并发编程之线程池_创建线程_06


Java并发编程之线程池_创建线程_07

线程池拒绝任务

线程池只有处于RUNNING状态的时候,才会接收新任务
当线程池已经达到最大线程数,并且工作队列使用有界阻塞队列并且已经达到最大容量的时候,会拒绝新任务

Java并发编程之线程池_线程池_08

拒绝策略:

AbortPolicy: 直接抛出异常,表明你没有提交成功
DiscardPolicy: 默默的丢弃你提交的任务,没有通知,不告诉调用线程
DiscardOldestPolicy: 丢弃存放时间最久的任务
CallerRunsPolicy: 线程池已经饱和,不能再加工作量,那个线程提交任务,叫那个线程运行这些任务
分别是四个静态类,ThreadPoolExecutor默认采用AbortPolicy()类来拒绝任务

private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

四种常用线程池简述

  1. newFixedThreadPool 固定线程数量的线程池
  2. newSingleThreadExecutor 单一线程数量的线程池
  3. newCachedThreadPool 会自动创建新线程的线程池
  4. newScheduledThreadPool 设置周期计划任务的线程池

建一个任务类,供线程池执行。

public class Task implements Runnable{
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}

}

newFixedThreadPool 线程池

ExecutorService接口继承了Executor接口,它比Executor接口有更丰富的方法,便于我们使用。Executors是Java线程池的工具类。
如下所示,我们创建了1000个任务

public class FixedThreadPoolTest {

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}

如图所示,始终都是5个线程在执行任务,并没有开启其他的线程

Java并发编程之线程池_阻塞队列_09

进入构造函数,

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

还是通过ThreadPoolExecutor来new线程池的。
第一个参数是corePoolSize 由自己填写的参数传入
第二个参数是maximumPoolSize 由自己填写的参数传入
第三个参数是keepAliveTime 线程数大于corePoolSize生效
第四个参数是TimeUnit unit(时间单位)
第五个参数是workQueue,这里使用的是LinkedBlockingQueue(无界阻塞队列,不停接收任务),因为线程数是固定的,所以任务只能堆积起来,所以使用无界阻塞队列

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

LinkedBlockingQueue的无参构造,创建的阻塞队列的容量是Integer.MAX_VALUE(并非真的无界,但是也非常大了,所以是一个近似说法)
但是这里会出问题,如果一个业务的任务很多,并且任务处理耗时,而我们指定的线程数太少的话,就会导致工作队列的任务越来越多,最终导致高内存消耗,甚至OOM(Out Of Memory)

这里corePoolSize==maximumPoolSize,所以线程数量永远不可能超过这个nThreads数量,所以线程数量永远不大于corePoolSize,这样keepAliveTime线程失效回收的时间也是没有意义的,所以这里写死了是0,永远不会有线程被回收

newSingleThreadExecutor线程池

public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}

}
}

始终只有一个线程执行任务

Java并发编程之线程池_线程池_10


进入构造函数

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

参数和上面线程池newFixedThreadPool一样,不过前两个参数都变成了1
第一个参数是corePoolSize
第二个参数是maximumPoolSize
第三个参数是keepAliveTime
第四个参数是TimeUnit unit(时间单位)
第五个参数是workQueue,这里使用的是LinkedBlockingQueue(无界阻塞队列),因为只有一个线程执行任务,所有的任务只能堆积起来,所以采用无界阻塞队列

然而newFixedThreadPool不一样的是newSingleThreadExecutor创建的线程池又被一个FinalizableDelegatedExecutorService包装了一下

FinalizableDelegatedExecutorService类如下

static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}

它继承了DelegatedExecutorService类并增加了一个finalize方法,finalize方法会在虚拟机利用垃圾回收清理对象时被调用,换言之,FinalizableDelegatedExecutorService的实例即使不手动调用shutdown方法关闭线程池,虚拟机也会帮你完成此任务,不过从严谨的角度出发,我们还是应该手动调用shutdown方法,毕竟Java的finalize不是C++的析构函数,必定会被调用,Java虚拟机不保证finalize一定能被正确调用,因此我们不应该依赖于它。

DelegatedExecutorService源代码如下

static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}

它也是一个线程池,确切的说是线程池的一个代理模式的实现,对它所有方法的调用其实是被委托到它持有的目标线程池上,不过它的功能是被阉割的, 因为他只实现并委托了部分方法,真实线程池存在的那些未被委托的方法在这里将无法使用。

综上所述newSingleThreadExecutor创建的线程池是一个

  • 单线任务处理的线程池
  • shutdown方法必然会被调用
  • 不具备ThreadPoolExecutor所有功能的线程池

newCachedThreadPool线程池

public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executorService.execute(new Task());
}
}
}

会创建很多线程,并且任务执行的线程也是随机的

Java并发编程之线程池_线程池_11


可缓存线程池,

无界线程池,具有自动回收多余线程的功能

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

第一个参数是corePoolSize
corePoolSize=0说明这种线程池没有核心线程,也就是线程池可以试图去销毁所有闲置的线程

第二个参数是maximumPoolSize
maximumPoolSize=Integer.MAX_VALUE可以认为这种线程池可以无限地创建线程(这里认为Integer.MAX_VALUE为无穷大)

第三个参数是keepAliveTime
线程空闲回收时间,60 秒这里是,因为后面一个参数的单位是秒

第四个参数是TimeUnit unit(时间单位)
第五个参数是workQueue,这里使用的是SynchronousQueue

SynchronousQueue实际上不是一个真正的队列,因为它不会为队列中元素维护存储空间,与其他队列不同的是,它维护一组线程,这些线程在等待着把元素(任务)加入或移出队列,所以可以知道,当任务来了,SynchronousQueue就把任务交给线程池,如果线程池没有空闲的线程,就会创建线程去处理这个任务,这就是为什么这种线程池创建线程看起来比较随意了。但是因为这种最多会创建Integer.MAX_VALUE个线程,在业务请求比较多的时候,可能会创建很多线程,导致高内存消耗的情况,不过空闲线程设定了空闲回收时间,所以高内存消耗不会持续很久,除非一直都有很多的业务请求需要处理。

图示如下:

Java并发编程之线程池_线程池_12

ScheduledThreadPool线程池

public class ScheduledThreadPoolTest {

public static void main(String[] args) {
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
threadPool.schedule(new Task(), 5, TimeUnit.SECONDS);
}
}

输出

Java并发编程之线程池_高并发_13

/*创建并执行在给定延迟后启用的一次性操作。
参数:
command - 要执行的任务
delay – 从现在开始延迟执行的时间
unit – 延迟参数的时间单位
返回:
一个 ScheduledFuture 表示待完成的任务,其get()方法将在完成时返回null
抛出:
RejectedExecutionException – 如果无法安排任务执行
NullPointerException – 如果命令为空*/
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);

delay为5,单位为秒,也就是延迟5秒执行,并且这个调用只会执行一次任务。

测试另一个方法

public class ScheduledThreadPoolTest {

public static void main(String[] args) {
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
threadPool.scheduleAtFixedRate(new Task(), 1, 1, TimeUnit.SECONDS);
}
}

输出

Java并发编程之线程池_创建线程_14

/*创建并执行一个周期性动作,在给定的初始延迟后首先启用,然后在给定的时间段内启用; 即执行将在initialDelay之后开始,然后是initialDelay+period ,然后是initialDelay + 2 * period ,依此类推。 如果任务的任何执行遇到异常,则后续执行将被抑制。 否则,任务只会通过取消或终止执行程序而终止。 如果此任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会并发执行。
参数:
command - 要执行的任务
initialDelay – 延迟第一次执行的时间
period – 连续执行之间的时间段
unit – initialDelay 和 period 参数的时间单位
返回:
一个 ScheduledFuture 表示待完成的任务,其get()方法将在取消时抛出异常
抛出:
RejectedExecutionException – 如果无法安排任务执行
NullPointerException – 如果命令为空
IllegalArgumentException – 如果周期小于或等于零
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

initialDelay=1、period=1,单位为秒,也就是延迟1秒执行任务,并且以1秒为一个周期一直执行任务。这种线程池,很适合执行具有周期性的任务。

线程池构造方法

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

跳转

/*
使用给定的核心池大小创建一个新的ScheduledThreadPoolExecutor 。
参数:
corePoolSize – 要保留在池中的线程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut
抛出:
IllegalArgumentException – 如果corePoolSize < 0
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

第一个参数是corePoolSize 自己传入

第二个参数是maximumPoolSize
maximumPoolSize=Integer.MAX_VALUE可以认为这种线程池可以无限地创建线程(这里认为Integer.MAX_VALUE为无穷大)

第三个参数是keepAliveTime

第四个参数是TimeUnit unit(时间单位)
第五个参数是workQueue,这里使用的是DelayedWorkQueue(延迟队列)

DelayedWorkQueue队列是定制的优先级队列,只能用来存储RunnableScheduledFutures任务。该队列是基于堆数据结构的实现。
​DelayedWorkQueue 使用及源码分析​​

线程池的线程数怎么设定?

  • CPU密集型:计算密集型,大部分时间用来做计算逻辑判断等CPU动作的程序称为CPU密集型任务。该类型的任务需要进行大量的计算,主要消耗CPU资源。 这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
  • IO密集型:IO密集型任务指任务需要执行大量的IO操作,涉及到网络、磁盘IO操作,对CPU消耗较少,其消耗的主要资源为IO。我们所接触到的 IO ,大致可以分成两种:磁盘 IO和网络 IO。
    01:磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库是在本地的话,那么这个也属于磁盘 IO。
    02:网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。
    IO 操作的特点就是需要等待,我们请求一些数据,需要数据准备,然后由对方将数据写入用户进程缓冲区,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU 时间片让出去。
    CPU 使用率较低,程序中会存在大量的 I/O 操作占用时间,导致线程空余时间很多,所以通常就需要开CPU核心数两倍或多倍的线程。当线程进行 I/O 操作 CPU 空闲时,线程等待时间所占比例越高,就需要越多线程,启用其他线程继续使用 CPU,以此提高 CPU 的使用率
  • 《Java并发编程实战》的作者 Brain Goetz 推荐的计算方法
    线程数= (IO时间 + CPU工作时间) / CPU工作时间 * CPU核数

References:

  • ​​https://www.jianshu.com/p/2b7d853322bb​​
  • ​​https://baijiahao.baidu.com/s?id=1662096273666037141&wfr=spider&for=pc​​

(写博客主要是对自己学习的归纳整理,资料大部分来源于书籍、网络资料和自己的实践,整理不易,但是难免有不足之处,如有错误,请大家评论区批评指正。同时感谢广大博主和广大作者辛苦整理出来的资源。)


举报

相关推荐

0 条评论