0 - 前言
在之前一篇文章《Java多线程?用CompletableFuture就够了》里,介绍了如何使用 CompletableFuture 进行多线程并发操作,但是限定了并发的子线程个数为一个确定值,在代码层面就固定了。当并发的子线程数量不固定时,那么,之前的用法就无法继续使用,此时需要换一个用法。
1 - 循环创建并发线程
1.1 - 基本思路
基本思路是:将所有的子线程任务通过循环的方式放入到一个 List<CompletableFuture> 里,根据业务的场景,选择不同的方法:
- 所有子线程都需要完成后再执行主线程
CompletableFuture.allOf().join()
- 其中任何一个子线程完成后就执行主线程
ComPletableFuture.anyOf()
1.2 - 上代码
业务场景:根据上传的多个行政区编码(adCode)并发查询天气信息。
因为 qWeatherByCode() 方法有返回值,所以需要使用 CompletableFuture.supplyAsync() 方法。
该方法返回一个 CompletableFuture 对象,然后加入到 List<CompletableFuture> 对象里。
然后使用 CompletableFuture.allOf().join() 方法,当调用该方法时,主线程会一直阻塞,直到 List<CompletableFuture> 里的子线程均已完成(或者超时)。
List<CompletableFuture> futures = new ArrayList();for (String adCode : adCodeList) { futures.add(CompletableFuture.supplyAsync(()-> qWeatherByCode(adCode) ));}CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();复制代码
需要注意的是,上面的代码里 CompletableFuture.supplyAsync(()->qWeatherByCode(adCode)),没有指定 Executor,所以使用默认的线程池 ForkJoinPool.commonPool()。
ForkJoinPool.commonPool() 是一个共享线程池(基于服务器内核的限制,如果CPU是八核,每次线程只能起八个,不能自定义线程池),如果使用不当,会对性能造成严重的影响。所以一般建议这里使用自定义的 Executor:
List<CompletableFuture> futures = new ArrayList();for (String adCode : adCodeList) { futures.add(CompletableFuture.supplyAsync(()-> qWeatherByCode(adCode), asyncExecutor() ));}CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
asyncExecutor():
@Bean("asyncExcutor")public Executor asyncExecutor() { log.info("start async executor"); ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();// 配置核心线程数 threadPoolTaskExecutor.setCorePoolSize(ThreadPoolConstant.CORE_POOL_SIZE);// 配置最大线程数 threadPoolTaskExecutor.setMaxPoolSize(ThreadPoolConstant.MAX_POOL_SIZE);// 配置队列大小 threadPoolTaskExecutor.setQueueCapacity(ThreadPoolConstant.QUEUE_CAPACITY);// 配置线程池中线程的名称前缀 threadPoolTaskExecutor.setThreadNamePrefix(ThreadPoolConstant.THREAD_NAME_PREFIX);// HelloWorldServiceImpl rejection-policy: 当pool已经达到max size时,如何处理新任务:// CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行;// AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常;// DiscardPolicy:丢弃当前将要加入队列的任务;// DiscardOldestPolicy:丢弃任务队列中最旧的任务; threadPoolTaskExecutor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() ); threadPoolTaskExecutor.initialize(); return threadPoolTaskExecutor; }
2 - CompletableFuture的常用场景
package com.example.demo;import org.junit.Test;import java.util.Arrays;import java.util.List;import java.util.Random;import java.util.concurrent.*;public class CompletableFutureDemo { /** * 创建CompletableFuture * - runAsync * - supplyAsync * - completedFuture * <p> * 异步计算启用的线程池是守护线程 */ @Test public void test1() { //1、异步计算:无返回值 //默认线程池为:ForkJoinPool.commonPool() CompletableFuture.runAsync(() -> { // TODO: 2018/9/8 无返回异步计算 System.out.println(Thread.currentThread().isDaemon()); }); //指定线程池,(到了jdk9CompletableFuture还拓展了延迟的线程池) CompletableFuture.runAsync(() -> { // TODO: 2018/9/8 无返回异步计算 }, Executors.newFixedThreadPool(2)); //2、异步计算:有返回值 // 使用默认线程池 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "result1"); //getNow指定异步计算抛出异常或结果返回null时替代的的值 String result1 = future1.getNow(null); // 指定线程池 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "result2", Executors.newFixedThreadPool(2)); //getNow指定异步计算抛出异常或结果返回null时替代的的值 String result2 = future2.getNow(null); //3、初始化一个有结果无计算的CompletableFuture CompletableFuture<String> future = CompletableFuture.completedFuture("result"); String now = future.getNow(null); System.out.println("now = " + now); } /** * 计算完成时需要对异常进行处理或者对结果进行处理 * - whenComplete:同步处理包括异常 * - thenApply:同步处理正常结果(前提是没有异常) * <p> * - whenCompleteAsync:异步处理包括异常 * - thenApplyAsync:异步处理正常结果(前提是没有异常) * <p> * - exceptionally : 处理异常 */ @Test public void test2() { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "result"); //whenComplete方法收future的结果和异常,可灵活进行处理 //1、同步处理 // 无返回值:可处理异常 future.whenComplete((result, throwable) -> System.out.println("result = " + result)); // 有返回值:没有异常处理(前提) CompletableFuture<String> resultFuture1 = future.thenApply(result -> "result"); String result1 = resultFuture1.getNow(null); //2、异步处理: // 无返回值: 默认线程池 future.whenCompleteAsync((result, throwable) -> System.out.println("result = " + result)); // 无返回值:指定线程池 future.whenCompleteAsync((result, throwable) -> System.out.println("result = " + result), Executors.newFixedThreadPool(2)); // 有返回值:默认线程池 CompletableFuture<String> resultFuture2 = future.thenApplyAsync(result -> "result"); String result2 = resultFuture2.getNow(null); // 有返回值:指定线程池 CompletableFuture<String> resultFuture3 = future.thenApplyAsync(result -> "result", Executors.newFixedThreadPool(2)); String result3 = resultFuture3.getNow(null); //3、处理异常,处理完之后返回一个结果 CompletableFuture<String> exceptionallyFuture = future.whenCompleteAsync((result, throwable) -> System.out.println("result = " + 1 / 0)) .exceptionally(throwable -> "发生异常了:" + throwable.getMessage()); System.out.println(exceptionallyFuture.getNow(null)); } /** * 异常处理还可以使用以下两个方法 * - handle * - handleAsync * <p> * 备注:exceptionally同步和异步计算一起用如果出现异常会把异常抛出。用以上的方法可以拦截处理 */ @Test public void test3() { CompletableFuture<String> exceptionoHandle = CompletableFuture.completedFuture("produce msg") .thenApplyAsync(s -> "result" + 1 / 0); String handleResult1 = exceptionoHandle.handle((s, throwable) -> { if (throwable != null) { return throwable.getMessage(); } return s; }).getNow(null); //指定线程池 String handleResult2 = exceptionoHandle.handleAsync((s, throwable) -> { if (throwable != null) { return throwable.getMessage(); } return s; }, Executors.newFixedThreadPool(2)).getNow(null); } /** * 生产--消费 * - thenAccept:同步的 * - thenAcceptAsync:异步的 * <p> * 接受上一个处理结果,并实现一个Consumer,消费结果 */ @Test public void test4() { //同步的 CompletableFuture.completedFuture("produce msg") .thenAccept(s -> System.out.println("sync consumed msg : " + s)); //异步的 //默认线程池 CompletableFuture.completedFuture("produce msg") .thenAcceptAsync(s -> System.out.println("async consumed msg : " + s)); //指定线程池 CompletableFuture.completedFuture("produce msg") .thenAcceptAsync(s -> System.out.println("async consumed msg : " + s), Executors.newFixedThreadPool(2)); } /** * 取消任务 * - cancel */ @Test public void test5() throws InterruptedException { CompletableFuture<String> message = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { try { Thread.sleep(800); } catch (InterruptedException e) { e.printStackTrace(); } return s + "result"; }); String now = message.getNow(null); System.out.println("now = " + now); //取消 boolean cancel = message.cancel(true); System.out.println("cancel = " + cancel); //如果这里再去获取,会抛出异常,说明已经取消了 //String now1 = message.getNow(null); Thread.sleep(1000); } /** * 两个异步计算 * - applyToEither:有返回值,同步 * - acceptEither:无返回值,同步 * - applyToEitherAsync:有返回值,异步 * - */ @Test public void test6() { CompletableFuture<String> task1 = CompletableFuture.completedFuture("task1") .thenApply(s -> "task1的计算结果:s1 = " + s); //同步,有返回值 //applyToEither第二个参数接收的值是task1计算的返回值 CompletableFuture<String> result1 = task1.applyToEither(CompletableFuture.completedFuture("task2") .thenApply(s -> "task2的计算结果:s2 = " + s), s -> s); System.out.println("task2:" + result1.getNow(null)); //同步,无返回值 task1.acceptEither(CompletableFuture.completedFuture("task3") .thenApply(s -> "task3的计算结果:s3 = " + s), s -> System.out.println("task3:" + s)); //异步有返回值,默认线程池,也可以指定 CompletableFuture<String> result2 = task1.applyToEitherAsync(CompletableFuture.completedFuture("task4") .thenApply(s -> "task4的计算结果:s4 = " + s), s -> s); //由于是异步的,主线程跑的快一点,因此join()之后才能看到跑完的结果 System.out.println("task4:" + result2.join()); //异步无返回值,指定线程池,也可以使用默认线程池 CompletableFuture<Void> task5 = task1.acceptEitherAsync(CompletableFuture.completedFuture("task5") .thenApply(s -> "task5的计算结果:s5 = " + s), s -> System.out.println("task5:" + s), Executors.newFixedThreadPool(2)); task5.join(); } /** * 组合计算结果 * - runAfterBoth:都计算完之后执行一段代码 * - thenAcceptBoth:都计算完之后把结果传入,并执行一段代码 * <p> * - thenCombine:组合两个结果 * - thenCompose:组合两个结果 */ @Test public void test7() { //runAfterBoth方式 StringBuilder msg = new StringBuilder("jorgeZhong"); CompletableFuture.completedFuture(msg) .thenApply(s -> s.append(" task1,")) .runAfterBoth(CompletableFuture.completedFuture(msg) .thenApply(s -> s.append(" task2")), () -> System.out.println(msg)); //thenAcceptBoth方式 CompletableFuture.completedFuture("jorgeZhong") .thenApplyAsync(String::toLowerCase) .thenAcceptBoth(CompletableFuture.completedFuture("jorgeZhong") .thenApplyAsync(String::toUpperCase), (s, s2) -> System.out .println("s1:" + s + ", s2:" + s2)); //thenCombine方式 CompletableFuture<String> result1 = CompletableFuture.completedFuture("jorgeZhong") .thenApply(String::toLowerCase) .thenCombine(CompletableFuture.completedFuture("jorgeZhong") .thenApply(String::toUpperCase), (s, s2) -> "s1:" + s + ", s2:" + s2); System.out.println("result1:" + result1.getNow(null)); //异步 CompletableFuture<String> result11 = CompletableFuture.completedFuture("jorgeZhong") .thenApply(String::toLowerCase) .thenCombineAsync(CompletableFuture.completedFuture("jorgeZhong") .thenApplyAsync(String::toUpperCase), (s, s2) -> "s1:" + s + ", s2:" + s2); System.out.println("result11:" + result11.join()); //thenCompose方式 CompletableFuture<String> result2 = CompletableFuture.completedFuture("jorgeZhong") .thenApply(String::toLowerCase) .thenCompose(s -> CompletableFuture.completedFuture("jorgeZhong") .thenApply(String::toUpperCase) .thenApply(s1 -> "s:" + s + ", s1:" + s1)); System.out.println("result2:" + result2.getNow(null)); //异步 CompletableFuture<String> result22 = CompletableFuture.completedFuture("jorgeZhong") .thenApply(String::toLowerCase) .thenComposeAsync(s -> CompletableFuture.completedFuture("jorgeZhong") .thenApplyAsync(String::toUpperCase) .thenApplyAsync(s1 -> "s:" + s + ", s1:" + s1)); System.out.println("result22:" + result22.join()); } /** * 多个CompletableFuture策略 * - anyOf:接受一个CompletableFuture数组,任意一个任务执行完返回。都会触发该CompletableFuture * - whenComplete:计算执行完之后执行实现的一段代码,将上一个结果和异常作为参数传入 */ @Test public void test8() throws InterruptedException { List<String> messages = Arrays.asList("a", "b", "c"); CompletableFuture.anyOf(messages.stream() .map(o -> CompletableFuture.completedFuture(o).thenApplyAsync(s -> { try { Thread.sleep(new Random().ints(99, 300).findFirst().getAsInt()); } catch (InterruptedException e) { e.printStackTrace(); } return s.toUpperCase(); })) .toArray(CompletableFuture[]::new)) .whenComplete((res, throwable) -> { if (throwable == null) { System.out.println(res.toString()); } }); Thread.sleep(1000); } /** * 多个CompletableFuture策略 * - allOf:接受一个CompletableFuture数组,所有任务返回后,创建一个CompletableFuture */ @Test public void test9() { List<String> messages = Arrays.asList("a", "b", "c"); CompletableFuture[] cfs = messages.stream() .map(s -> CompletableFuture.completedFuture(s).thenApplyAsync(String::toUpperCase)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(cfs) .whenCompleteAsync((aVoid, throwable) -> Arrays.stream(cfs).forEach(completableFuture -> System.out .println(completableFuture.getNow(null)))); }}
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/97660.html