0
点赞
收藏
分享

微信扫一扫

SpringBoot2.x整合线程池(ThreadPoolTaskExecutor)

JAVA && Spring && SpringBoot2.x — 学习目录

1. SpringBoot对线程池的自动装载

源代码:org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration

    @Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder() {
TaskExecutionProperties.Pool pool = this.properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix());
builder = builder.customizers(this.taskExecutorCustomizers);
builder = builder.taskDecorator(this.taskDecorator.getIfUnique());
return builder;
}

@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })

@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}

我们可以在配置文件中配置连接池的相关参数。

2. 自定义线程池

2.1 根据业务配置不同的线程池

@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {

@Bean
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new VisiableThreadPoolTaskExecutor();
//核心线程数
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
//最大线程数
threadPoolTaskExecutor.setMaxPoolSize(5);
//配置队列大小
threadPoolTaskExecutor.setQueueCapacity(50);
//配置线程池前缀
threadPoolTaskExecutor.setThreadNamePrefix("async-service-");
//拒绝策略
// threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
threadPoolTaskExecutor.setRejectedExecutionHandler(new PrintingPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}

@Bean
public Executor customServiceExecutor(){
ThreadPoolTaskExecutor threadPoolTaskExecutor=new ThreadPoolTaskExecutor();
//线程核心数目
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
//最大线程数
threadPoolTaskExecutor.setMaxPoolSize(10);
//配置队列大小
threadPoolTaskExecutor.setQueueCapacity(50);
//配置线程池前缀
threadPoolTaskExecutor.setThreadNamePrefix("custom-service-");
//配置拒绝策略
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//数据初始化
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
@Slf4j
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {


//打印队列的详细信息
private void showThreadPoolInfo(String prefix){
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

if(null==threadPoolExecutor){
return;
}

log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}


@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}

@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}

@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}

@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}

@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}

}

2.2 如何使用连接池

//带返回值的任务
@Async("asyncServiceExecutor")
public Future<String> doTask1() throws InterruptedException{
log.info("Task1 started.");
long start = System.currentTimeMillis();
Thread.sleep(5000);
long end = System.currentTimeMillis();

log.info("Task1 finished, time elapsed: {} ms.", end-start);

return new AsyncResult<>("Task1 accomplished!");
}

@Async("customServiceExecutor")
public Future<String> doTask2() throws InterruptedException{
log.info("Task2 started.");
long start = System.currentTimeMillis();
Thread.sleep(3000);
long end = System.currentTimeMillis();

log.info("Task2 finished, time elapsed: {} ms.", end-start);

return new AsyncResult<>("Task2 accomplished!");
}
    //创建的是Runnable的任务
@Async("asyncServiceExecutor")
public void executeAsync() {
log.info("start executeAsync");
try{
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
}
log.info("end executeAsync");
}

2.3 如何获取任务的返回值

方法一:使用自旋操作,等待任务结果返回。

    @RequestMapping("/helloFuture")
@ResponseBody
public String helloFuture() {
try {
Future<String> future1 = serviceImpl.doTask1();
Future<String> future2 = serviceImpl.doTask2();
//自旋锁,停止等待
while (true) {
if (future1.isDone() && future2.isDone()) {
log.info("Task1 result:{}", future1.get());
log.info("Task2 result:{}", future2.get());
break;
}
Thread.sleep(1000);
}
log.info("All tasks finished.");
return "S";
} catch (InterruptedException e) {
log.error("错误信息1", e);
return "F";
} catch (ExecutionException e) {
log.error("错误信息2", e);
return "F";
}
}

方法二:使用CountDownLatch计数器

    @RequestMapping("/helloFuture2")
@ResponseBody
public String helloFuture2() {
try {
CountDownLatch latch=new CountDownLatch(2);
Future<String> future1 = serviceImpl.doTask1(latch);
Future<String> future2 = serviceImpl.doTask2(latch);
//等待两个线程执行完毕
latch.await();
log.info("All tasks finished!");
String result1 = future1.get();
String result2 = future2.get();
log.info(result1+"--"+result2);
return "S";
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "F";
}

每个任务执行完毕,只需要调用latch.countDown();使得计数器-1。

  //带返回值的任务
@Async("asyncServiceExecutor")
public Future<String> doTask1(CountDownLatch latch) throws InterruptedException{
log.info("Task1 started.");
long start = System.currentTimeMillis();
Thread.sleep(5000);
long end = System.currentTimeMillis();

log.info("Task1 finished, time elapsed: {} ms.", end-start);
latch.countDown();
return new AsyncResult<>("Task1 accomplished!");
}

@Async("customServiceExecutor")
public Future<String> doTask2(CountDownLatch latch) throws InterruptedException{
log.info("Task2 started.");
long start = System.currentTimeMillis();
Thread.sleep(3000);
long end = System.currentTimeMillis();

log.info("Task2 finished, time elapsed: {} ms.", end-start);
latch.countDown();
return new AsyncResult<>("Task2 accomplished!");
}

方式三:使用Future的get方法的阻塞特性

    @RequestMapping("/helloFuture2")
@ResponseBody
public String helloFuture2() {
try {
List<Future<String>> tasks = new ArrayList<>();
List<String> results = new ArrayList<>();
tasks.add(serviceImpl.doTask1());
tasks.add(serviceImpl.doTask2());
//各个任务执行完毕
for (Future<String> task : tasks) {
//每个任务都会再在此阻塞。
results.add(task.get());
}
log.info("All tasks finished!");
log.info("执行结果:{}", JSON.toJSONString(results));
return "S";
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "F";
}

2.4 Runnable异常处理

该配置可与线程池配置在一起,若异步线程抛出异常,会由该类打印。

@Configuration
public class ExecutorConfig implements AsyncConfigurer {
//配置异常处理机制
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex,method,params)->{
log.error("异步线程执行失败。方法:[{}],异常信息[{}] : ", method, ex.getMessage(),ex);
};
}
}

效果图:

2019-12-25 19:14:09.851 ERROR [] --- [async-service-1] c.g.Config.threadPool.ExecutorConfig     : 异步线程执行失败。方法:[public void com.galax.bussiness.account.impl.AccountServiceImpl.getAccInfoByTime(java.lang.String,java.lang.String)],异常信息[/ by zero] : 

java.lang.ArithmeticException: / by zero
at com.galax.bussiness.account.impl.AccountServiceImpl.getAccInfoByTime(AccountServiceImpl.java:308)
at com.galax.bussiness.account.impl.AccountServiceImpl$FastClassBySpringCGLIB$4e0db2a2.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:93)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
at java.util.concurrent.FutureTask.run$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

彩蛋——Future<T>使用lambda表达式

    public void sendMail(Map<String, Object> model, String title, String templateName, String toMail, String[] ccMail, long timeout) throws Exception {
Future<String> submit;
submit = emailServiceExecutor.submit(() ->{
try {
return "s";
} catch (Exception e) {
return "F";
}
});

}

彩蛋——若自定义实现线程池,如何获取到各个任务的结果

  @Test
public void test() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(9));
//需要执行的任务
List<Account> students= new ArrayList<>(5);
//将任务转换为Callable对象
List<Callable<Integer>> callables = new ArrayList<>();
//保存返回结果
List<Integer> results=new ArrayList<>();
//开启线程,lambda表达式
for (Student student : students) {
callables.add(()->{
//插入操作,并发执行
log.info(JSON.toJSONString(student ));
//表示异步操作
int save = serviceImpl.getStu(student);
//返回值
return save ;
});
}
//获取到所有任务的处理结果
List<Future<Integer>> futures = executor.invokeAll(callables);
//遍历每个任务的执行结果,每次future.get()只有在任务执行完毕后,才会继续循环操作,否则会阻塞,等待线程执行结束
for (Future<Integer> future:futures){
try {
results.add(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown(); //关闭线程池
log.info("数据执行完毕!{}",JSON.toJSONString(results));
}

推荐阅读

Callable+ThreadPoolExecutor实现多线程并发并获得返回值

springboot线程池的使用和扩展

多线程——线程池ThreadPoolExecutor

举报

相关推荐

0 条评论