JUC 并发组件CompletableFuture
优势
- 不会阻塞当前线程,如主线程后续逻辑;
- 异常处理
- 多个异步任务的编排
- 函数式编程、链式处理等
基本原理
CompletableFuture 同样实现了之前的 Future ,此外新增加了接口 CompletionStage的实现,链式调用这些方法在这里都能找到。
链式增加的这些依赖动作,是利用的 Treiber stack 即并发安全的无锁队列(栈),当然也是基于CAS操作。
常用 API 使用
创建任务 supplyAsync、runAsync
默认使用 ForkJoinPool.commonPool() 进程内共享同一个线程池,不同业务场景如果都使用该默认线程池可能会互相影响,因此强烈建议各自场景有自己的线程池。
使用标准:带自定义线程池参数的方式。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor){..}
- supplyAsync 的任务 带返回值
- runAsync 的任务 无返回值
方法的返回类型是 CompletableFuture,可以获取返回值,如下几个方式:
1. V get();
2. V get(long timeout,Timeout unit);
3. T getNow(T defaultValue);
4. T join();
- get() 阻塞获取结果,需要强制捕获异常
- join() 阻塞获取结果,抛出 unchecked 异常,可不用捕获。方便在lambda 表达式使用。
- getNow(T defaultValue) 如果任务未执行完则取默认值,否则取任务返回值。也会抛出异常,但不强制捕获。
- get(long timeout,Timeout unit) 如果超时则抛出超时异常。
demo
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// ...
return 1;
}, mypool);
try {
Integer integer = future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
链式处理 thenAccept、thenRun、thenApply
如上述返回的 CompletableFuture 对象,可以继续链式调用这些方法接着处理任务的返回,串联任务。
- thenAccept() 接收任务返回值,处理,不再返回参数
- thenApply() 接收任务返回值,处理,再返回参数
- thenRun() 继续处理,不关心参数
也可以再这个 then 的处理中,再使用异步处理,交给比如其他线程池。如
- thenAcceptAsync()
- thenRunAsync()
- thenApplyAsync()
demo
CompletableFuture<Void> future2 = future.thenApply(i -> {
return i + 10;
}).thenApplyAsync(i -> {
return i + 20;
}, mypool2).thenAccept(i -> {
System.out.println(i);
});
future2.join();
异常处理 exceptionally、handle
- exceptionally(…)
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
CompletableFuture 的一个优势就是链式的异常处理,如果前边任务发生异常,这里可以做一个兜底返回等处理。应用时要注意,只有发生异常了,才会进入该方法执行。
demo
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int j = 1/0;
return 1;
}, mypool).exceptionally(e -> {
// 异常时才会进入,不发生异常不会进入
System.out.println(e.getMessage());
return 2;
});
- handle(…)
与 exceptionally() 类似,handle() 也可以对异常进行处理,并且包括 exceptionally() 的能力,也可以对非异常进行处理。这里就要注意对异常的非空判断。
handle 接收两个参数(原返回值 + 异常),返回一个参数。
demo
CompletableFuture<String> future = CompletableFuture.
supplyAsync(() -> String.valueOf(1 / 0))
.handle((s, e) -> {
if (e == null) {
System.out.println(s);
} else {
//java.lang.ArithmeticException: / by zero
System.out.println(e.getMessage());
}
return "handle result";
})
.exceptionally(e -> {
// 未执行
System.out.println("ex:" + e.getMessage());
return "exceptionally result";
});
System.out.println(future.join());
注意到如果先操作了 handle,并且不再抛异常,那么后续的 exceptionally 就不会再进入执行。
- whenComplete(…)
和 handle(…) 相似,whenComplete(…) 也是接受 两个参数(原返回值 + 异常),但无返回值。同样也是有无异常都会进入该方法。
编排多个异步任务
thenCombine、thenAcceptBoth、runAfterBoth
前文中都是针对某一个 CompletableFuture 对象,做链式处理。如果有多个异步任务,即多个 CompletableFuture,也提供了编排处理的API。
-
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
合并两个 CompletableFuture 任务,均拿到各自返回值,func 处理后,再返回 CompletableFuture 对象
-
thenAcceptBoth(…)
拿到前两个各自返回值,func 处理后,不对外返回值
-
runAfterBoth(…)
不关心前两个返回值(但都要执行完),也不对外返回
以上几个方法,也都提供 Async 方法。如果不使用 Async,func 执行默认使用前两个中后执行完的线程。
demo
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("future1 thread: " + Thread.currentThread().getName());
sleepx(1);
return 1;
}, mypool);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("future2 thread: " + Thread.currentThread().getName());
sleepx(2);
return "result";
}, mypool);
CompletableFuture<String> okFuture = future1.thenCombine(future2, (future1Result, future2Result) -> {
System.out.println("okFuture thread: " + Thread.currentThread().getName());
return future1Result + " " + future2Result;
});
String join = okFuture.join();
applyToEither、acceptEither、runAfterEither
与上文 Both 类似,Either 是两个CompletableFuture 对象中,有一个完成即可执行后续。
demo
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("future1 thread: " + Thread.currentThread().getName());
sleepx(1);
return "1";
}, mypool);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("future2 thread: " + Thread.currentThread().getName());
sleepx(2);
return "2";
}, mypool);
CompletableFuture<String> okFuture = future1.applyToEither(future2, (futureResult) -> {
System.out.println("okFuture thread: " + Thread.currentThread().getName());
return futureResult;
});
String join = okFuture.join();
allOf、anyOf
上文中的编排 API 都是两个 CompletableFuture 的操作,当然再接着链式编程也可以绑上多个 CompletableFuture。而 allOf、anyOf 的API 就支持多个 CompletableFuture。
- allOf(…) 等待所有 CompletableFuture 返回,
- anyOf(…) 等待最快的 CompletableFuture 返回
注意直到调用 join() 才阻塞等待返回
demo
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
System.out.println("future1 thread: " + Thread.currentThread().getName());
sleepx(1);
return "hi";
}, mypool);
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
System.out.println("future1 thread: " + Thread.currentThread().getName());
sleepx(2);
return "hello";
}, mypool);
CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> {
System.out.println("future1 thread: " + Thread.currentThread().getName());
sleepx(2);
return "halo";
}, mypool);
long begin = System.currentTimeMillis();
// anyOf
CompletableFuture.anyOf(futureA, futureB, futureC).join();
// 1s
System.out.println("anyOf cost: " + (System.currentTimeMillis() - begin));
// allOf
CompletableFuture.allOf(futureA, futureB, futureC).join();
// 2s
System.out.println("allOf cost: " + (System.currentTimeMillis() - begin));
参考
- 《Java8 实战》
- 使用CompletableFuture优化代码