通过函数式接口实现模板模式线程池
 
 
主class
 
package demo;
import java.util.*;
import java.util.concurrent.*;
/**
* @date:  2021/9/30 9:58
* @version: V1.0
* @Description:
* <p>
*  批处理的工具类
* </p>
*/
public class BatchTemplate {
/**
 * 定义最大的执行线程数量
 */
private static final Integer MAX_THREAD  = 8;
/**
 *
 * @param sourceList 执行的数据列表
 * @param splitCount 拆分的数量
 * @param executor 执行的方法
 * @param combiner 对返回值的拼接处理
 * @param maxThread 最大线程数
 * @param <T> 请求参数
 * @param <V> 返回值
 * @return
 */
public static <T,V> WhhRestResult executeWithSplitList(List<T> sourceList,Integer splitCount,
                                                       BatchCallExecutor<List<T>,V> executor,ResultCombiner<V> combiner,Integer maxThread) {
    //拆分
    List<List<T>> splitGroup = ListSplitUtils.splitList(sourceList,splitCount);
    return  executeWithoutException(splitGroup,executor,combiner,maxThread);
}
/**
 * 不含异常的处理方法
 * @param req
 * @param executor
 * @param combiner
 * @param maxThread
 * @param <T>
 * @param <V>
 * @return
 */
public static <T,V> WhhRestResult executeWithoutException(Collection<T> req, BatchCallExecutor<T,V> executor, ResultCombiner<V> combiner, Integer maxThread){
    return execute(req,executor,combiner,null,maxThread);
}
/**
 * 简单的无需处理返回值和异常
 * @param req
 * @param executor
 * @param <T>
 * @return
 */
public static <T> WhhRestResult execute(Collection<T> req, BatchExecutor<T> executor) {
    BatchCallExecutor<T,Boolean> batchCallExecutor = e->{
        executor.execute(e);
        return true;
    };
    return execute(req, batchCallExecutor, aBoolean -> {});
}
/**
 * 需要对异常进行处理
 * @param req
 * @param executor
 * @param handler
 * @param <T>
 * @return
 */
public static <T> WhhRestResult execute(Collection<T> req, BatchExecutor<T> executor, BatchErrorHandler<T> handler) {
    BatchCallExecutor<T,Boolean> batchCallExecutor = e->{
        executor.execute(e);
        return true;
    };
    return execute(req, batchCallExecutor, aBoolean -> {},handler,MAX_THREAD);
}
/**
 * 无需处理异常
 * @param req
 * @param executor
 * @param combiner
 * @param <T>
 * @param <V>
 * @return
 */
public static <T,V> WhhRestResult execute(Collection<T> req, BatchCallExecutor<T,V> executor,ResultCombiner<V> combiner){
    return execute(req,executor,combiner,null);
}
/**
 * 无需指定最大线程数
 * @param req
 * @param executor
 * @param combiner
 * @param handler
 * @param <T>
 * @param <V>
 * @return
 */
public static <T,V> WhhRestResult execute(Collection<T> req, BatchCallExecutor<T,V> executor,ResultCombiner<V> combiner,BatchErrorHandler<T> handler) {
    return execute(req,executor,combiner,handler,MAX_THREAD);
}
/**
 * 批量执行的核心方法,如果部分成功,统一返回WhhRestResult[200]
 * 允许用户自定义处理异常和结果集,无并发问题
 * @param req 请求的参数集合,集合中每个元素给定一个线程执行
 * @param executor 处理元素的执行器
 * @param combiner 元素返回结果的合并执行器
 * @param handler 处理异常的执行器,将请求参数作为回调参数
 * @param maxThread 用户指定的最大执行线程数
 * @param <T> 请求参数类型
 * @param <V> 返回参数类型
 * @return
 */
public static <T,V> WhhRestResult execute(
        Collection<T> req,
        BatchCallExecutor<T,V> executor,
        ResultCombiner<V> combiner,
        BatchErrorHandler<T> handler,
        Integer maxThread
) {
    //如果要执行的列表为空,直接返回
    if(CollectionUtils.isEmpty(req)){
        return WhhRestResult.ok().message(null);
    }
    //如果执行的数量为1,则不需要开启多线程
    if(req.size()==1){
        T t =  req.iterator().next();
        try{
            V v =  executor.execute(t);
            combiner.combine(v);
        }catch (Exception ex){
            //如果定义了异常处理,则执行自定义异常处理
            if(handler!=null){
                log.error("批量执行单条时出错:原因: ",ex);
                handler.handError(t,ex);
                return WhhRestResult.server_error().message(ex.getMessage());
            }else{
                log.error("批量执行时出错:",ex);
                throw new RuntimeException(ex.getMessage());
            }
        }
        return WhhRestResult.ok();
    }
    //定义最大的线程数
    //不论用户定义多大,自己要定一个上线
    Integer maxNum  = req.size()+1>maxThread?maxThread:req.size()+1;
    if(maxNum>MAX_THREAD){
        maxNum = MAX_THREAD;
    }
    //新开一个定长的线程池
    ExecutorService executorService = Executors.newFixedThreadPool(maxNum);
    List<Callable> tasks = new ArrayList<Callable>();
    List<T> reqList = new ArrayList<>();
    //保证线程同步发挥数据
    for (T t : req) {
        tasks.add((Callable<V>) () -> executor.execute(t));
        reqList.add(t);
    }
    List<Future<V>> resultList = new ArrayList();
    for (Callable task : tasks) {
        Future future = executorService.submit(task);
        resultList.add(future);
    }
    //执行过后关闭线程池
    executorService.shutdown();
    //拼接错误信息返回给用户
    StringBuilder sb = new StringBuilder("错误信息");
    //记录是否成功
    Boolean isSuccess = true;
    //记录是否部分成功
    Boolean partSuccess = false;
    Set<String> errorMsg = new HashSet<>();
    for (int i = 0; i < resultList.size(); i++) {
        Future<V> fs = resultList.get(i);
        try {
            V result = fs.get();
            combiner.combine(result);
            partSuccess= true;
        } catch (InterruptedException e) {
            isSuccess = false;
            if(handler!=null){
                handler.handError(reqList.get(i),e);
            }else{
                throw new RuntimeException(e.getMessage());
            }
        } catch (ExecutionException e) {
            isSuccess = false;
            if(handler!=null){
                if(!errorMsg.contains(e.getMessage())){
                    sb.append(":"+e.getMessage());
                    errorMsg.add(e.getMessage());
                }
                handler.handError(reqList.get(i),e);
            }else{
                //业务异常记录日志
                log.error("批量执行时出错:",e);
                throw new RuntimeException(e.getMessage());
            }
        }
    }
    WhhRestResult result;
    //如果没有部分成功,则返回异常
    if(!partSuccess){
        result = WhhRestResult.server_error().message(sb.toString());
    }else{
        //如果部分成功,返回状态码都是200
        result = WhhRestResult.ok();
        if(!isSuccess){
            result.setMessage(sb.toString());
        }else{
            result.setMessage(null);
        }
    }
    return result;
}
}
 
参与的一些函数式接口
 
public interface ResultCombiner<V> {
/**
 * 处理返回值
 * @param v 返回值
 */
void combine(V v);
}
public interface BatchCallExecutor<T, V> {
	 V execute(T req);
}
public interface BatchExecutor<T> {
/**
 *
 * @param req 请求参数
 */
void execute(T req);
}
public interface BatchErrorHandler<T> {
/**
 *
 * @param req 请求参数
 * @param ex 异常
 */
void handError(T req,Exception ex);
}
 
List分组类
 
public class ListSplitUtils {
public static <T> List<List<T>> splitList(List<T> list, int len) {
    if (list == null || list.size() == 0 || len < 1) {
        return null;
    }
    List<List<T>> result = new ArrayList<>();
    int size = list.size();
    int count = (size + len - 1) / len;
    for (int i = 0; i < count; i++) {
        List<T> subList = list.subList(i * len, ((i + 1) * len > size ? size : len * (i + 1)));
        result.add(subList);
    }
    return result;
}
}