CompletableFuture异步编排

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

CompletableFuture异步编排

ingxx   2020-03-30 我要评论
## 什么是CompletableFuture CompletableFuture是JDK8提供的Future增强类。CompletableFuture异步任务执行线程池,默认是把异步任务都放在ForkJoinPool中执行。 在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。 ## Future存在的问题 Future实际采用FutureTask实现,该对象相当于是消费者和生产者的桥梁,消费者通过 FutureTask 存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。 ## 使用 ### runAsync 和 supplyAsync方法 CompletableFuture 提供了四个静态方法来创建一个异步操作。 ``` public static CompletableFuture runAsync(Runnable runnable) public static CompletableFuture runAsync(Runnable runnable, Executor executor) public static CompletableFuture supplyAsync(Supplier supplier) public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) ``` 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。 - runAsync方法不支持返回值。 - supplyAsync可以支持返回值。 ## 计算完成时回调方法 当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法: ```java public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action); public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action); public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor); public CompletableFuture exceptionally(Function fn); ``` whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。BiConsumer<? super T,? super Throwable>可以定义处理业务 whenComplete 和 whenCompleteAsync 的区别: whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。 whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。 **方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)** 代码示例: ```java public class CompletableFutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() { @Override public Object get() { System.out.println(Thread.currentThread().getName() + "\t completableFuture"); int i = 10 / 0; return 1024; } }).whenComplete(new BiConsumer() { @Override public void accept(Object o, Throwable throwable) { System.out.println("-------o=" + o.toString()); System.out.println("-------throwable=" + throwable); } }).exceptionally(new Function() { @Override public Object apply(Throwable throwable) { System.out.println("throwable=" + throwable); return 6666; } }); System.out.println(future.get()); } } ``` ## handle 方法 handle 是执行**任务完成时**对结果的处理。 handle 是在任务完成后再执行,还可以处理异常的任务。 ```java public CompletionStage handle(BiFunction<? super T, Throwable, ? extends U> fn); public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor); ``` ## 线程串行化方法 thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。 thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。 thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作 带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。 ```java public CompletableFuture thenApply(Function<? super T,? extends U> fn) public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn) public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) public CompletionStage thenAccept(Consumer<? super T> action); public CompletionStage thenAcceptAsync(Consumer<? super T> action); public CompletionStage thenAcceptAsync(Consumer<? super T> action,Executor executor); public CompletionStage thenRun(Runnable action); public CompletionStage thenRunAsync(Runnable action); public CompletionStage thenRunAsync(Runnable action,Executor executor); ``` Function<? super T,? extends U> T:上一个任务返回结果的类型 U:当前任务的返回值类型 代码演示: ```java public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() { @Override public Integer get() { System.out.println(Thread.currentThread().getName() + "\t completableFuture"); //int i = 10 / 0; return 1024; } }).thenApply(new Function() { @Override public Integer apply(Integer o) { System.out.println("thenApply方法,上次返回结果:" + o); return o * 2; } }).whenComplete(new BiConsumer() { @Override public void accept(Integer o, Throwable throwable) { System.out.println("-------o=" + o); System.out.println("-------throwable=" + throwable); } }).exceptionally(new Function() { @Override public Integer apply(Throwable throwable) { System.out.println("throwable=" + throwable); return 6666; } }).handle(new BiFunction() { @Override public Integer apply(Integer integer, Throwable throwable) { System.out.println("handle o=" + integer); System.out.println("handle throwable=" + throwable); return 8888; } }); System.out.println(future.get()); } ``` ## 两任务组合 - 都要完成 两个任务必须都完成,触发该任务。 thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值 thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。 runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。 ```java public CompletableFuture thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); public CompletableFuture thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); public CompletableFuture thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor); public CompletableFuture thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); public CompletableFuture thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); public CompletableFuture thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor); public CompletableFuture runAfterBoth(CompletionStage<?> other, Runnable action); public CompletableFuture runAfterBothAsync(CompletionStage<?> other, Runnable action); public CompletableFuture runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor); ``` 测试案例: ```java public static void main(String[] args) { CompletableFuture.supplyAsync(() -> { return "hello"; }).thenApplyAsync(t -> { return t + " world!"; }).thenCombineAsync(CompletableFuture.completedFuture(" CompletableFuture"), (t, u) -> { return t + u; }).whenComplete((t, u) -> { System.out.println(t); }); } ``` 输出:hello world! CompletableFuture ## 两任务组合 - 一个完成 当两个任务中,任意一个future任务完成的时候,执行任务。 applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。 acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。 runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。 ```java public CompletableFuture applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn); public CompletableFuture applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn); public CompletableFuture applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor); public CompletableFuture acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action); public CompletableFuture acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action); public CompletableFuture acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor); public CompletableFuture runAfterEither(CompletionStage<?> other, Runnable action); public CompletableFuture runAfterEitherAsync(CompletionStage<?> other, Runnable action); public CompletableFuture runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor); ``` ## 多任务组合 ```java public static CompletableFuture allOf(CompletableFuture<?>... cfs); public static CompletableFuture anyOf(CompletableFuture<?>... cfs); ``` allOf:等待所有任务完成 anyOf:只要有一个任务完成 ```java public static void main(String[] args) { List futures = Arrays.asList(CompletableFuture.completedFuture("hello"), CompletableFuture.completedFuture(" world!"), CompletableFuture.completedFuture(" hello"), CompletableFuture.completedFuture("java!")); final CompletableFuture allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})); allCompleted.thenRun(() -> { futures.stream().forEach(future -> { try { System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); }); } ```

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们