0
点赞
收藏
分享

微信扫一扫

java 用CompletableFuture来实现多线程查询和结果合并

多线程查询结果合并

使用CompletableFuture来实现多线程查询和结果合并。CompletableFuture提供了一种方便的方式来协调异步任务并处理其结果。下面是一个使用CompletableFuture的示例:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class MultiThreadQueryExample {

    public static void main(String[] args) {
        // 模拟要查询的数据
        List<String> data = new ArrayList<>();
        for (int i = 1; i <= 100; i++) {
            data.add("Data " + i);
        }

        // 创建CompletableFuture列表,用于存储每个查询的结果
        List<CompletableFuture<List<String>>> futures = new ArrayList<>();

        // 每个CompletableFuture负责查询一部分数据
        int batchSize = 20;
        for (int i = 0; i < data.size(); i += batchSize) {
            int endIndex = Math.min(i + batchSize, data.size());
            List<String> subList = data.subList(i, endIndex);
            CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(() -> performQuery(subList));
            futures.add(future);
        }

        // 使用CompletableFuture的allOf方法等待所有查询完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

        // 当所有查询完成时,对所有结果进行合并
        CompletableFuture<List<String>> mergedResult = allFutures.thenApply(v ->
                futures.stream()
                        .map(CompletableFuture::join)
                        .flatMap(List::stream)
                        .collect(Collectors.toList())
        );

        // 阻塞等待合并结果
        List<String> result = mergedResult.join();

        // 输出合并后的结果
        System.out.println("Merged Result:");
        for (String r : result) {
            System.out.println(r);
        }
    }

    private static List<String> performQuery(List<String> data) {
        // 在这里进行查询操作,返回查询结果

        // 模拟查询耗时
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 返回查询结果
        List<String> result = new ArrayList<>();
        for (String item : data) {
            result.add("Result for " + item);
        }
        return result;
    }
}

在这个示例中,我们使用CompletableFuture.supplyAsync()方法将查询任务提交给线程池执行异步查询操作,并返回一个CompletableFuture对象表示查询的结果。所有的CompletableFuture对象存储在futures列表中。

然后,我们使用CompletableFuture.allOf()方法等待所有查询任务完成。当所有任务都完成时,我们使用thenApply()方法对所有结果进行合并操作,最终得到一个合并后的CompletableFuture<List<String>>对象。

通过调用join()方法,我们可以阻塞等待合并结果的完成,并获取最终的查询结果。

请注意,CompletableFuture还提供了其他一些方法,如thenCompose()thenCombine()等,可以进一步实现异步任务的流水线处理和组合操作,以满足不同的业务需求。


future.join()与future.get()的区别

future.join()future.get() 都可以用于获取 CompletableFuture 的结果,但在具体的使用上有一些差异。

  1. 异常处理:join() 方法在遇到异常时会将其包装在 CompletionException 中直接抛出;而 get() 方法会抛出 InterruptedException(需要处理线程中断)和 ExecutionException(需要处理任务执行过程中发生的异常)。因此,在使用时需要根据具体情况选择合适的异常处理方式。
  2. 受检异常:get() 方法声明了抛出 InterruptedExceptionExecutionException 这两个受检异常,因此在使用 get() 方法时必须显式地处理这两种异常,或者将它们往上层抛出。相比之下,join() 方法没有声明受检异常,所以不需要在代码中强制处理。
  3. 线程中断:get() 方法会抛出 InterruptedException,这意味着在调用 get() 时,如果当前线程被中断,就会抛出该异常。这要求在使用 get() 方法时必须处理线程中断的情况。而 join() 方法并不抛出 InterruptedException,所以在调用 join() 时不会对线程中断状态进行检查和处理。
  4. 返回值和异常封装:get() 方法返回的是 Future 的结果对象,通过调用 get() 方法可以获取这个结果,或者通过 isDone() 方法判断是否完成。而 join() 方法直接返回结果值,并且如果遇到异常,会将异常包装在 CompletionException 中抛出。

总结来说,join() 方法更加简洁,不需要显式处理受检异常和线程中断,但对于异常处理,其抛出的异常类型相对固定,无法区分具体的异常类型。而 get() 方法需要显式处理受检异常和线程中断,但可以更细粒度地对不同类型的异常进行处理。


举报

相关推荐

0 条评论