首页>>后端>>java->多线程之 completableFuture 详述

多线程之 completableFuture 详述

时间:2023-11-30 本站 点击:1

先谈谈 Future

Callable 与 Runnable 的功能大致相似,但是 call() 函数有返回值。Callable 一般是和 ExecutorService 配合来使用的

Future 就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成

在 Future 接口中声明了 5 个方法

cancel 方法用来取消任务,如果取消任务成功则返回 true,如果取消任务失败则返回 false。

isCancelled 方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

isDone 方法表示任务是否已经完成,若任务完成,则返回 true;

get() 方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

get(long timeout, TimeUnitunit) 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回 null。

也就是说 Future 提供了三种功能:

判断任务是否完成;

能够中断任务;

能够获取任务执行结果。

因为 Future 只是一个接口,所以是无法直接用来创建对象使用的,因此就有了 FutureTask。

来两个 demo:

publicstaticvoidfutureDemo1()throwsExecutionException,InterruptedException{ThreadPoolExecutorpool=CommonThreadPool.getPool();Future<Integer>f=pool.submit(()->{//长时间的异步计算Thread.sleep(2000);//然后返回结果return100;});while(!f.isDone()){System.out.println(System.currentTimeMillis()+"还没结束");}//结束后,获取结果System.out.println(f.get());}

Future 只实现了异步,而没有实现回调,主线程 get 时会阻塞,可以轮询以便获取异步调用是否完成。在实际的使用中建议使用 Guava ListenableFuture 来实现异步非阻塞,目的就是多任务异步执行,通过回调的方式来获取执行结果而不需轮询任务状态。

publicstaticvoidfutureDemo2(){ListeningExecutorServiceexecutorService=MoreExecutors.listeningDecorator(CommonThreadPool.getPool());IntStream.rangeClosed(1,10).forEach(i->{ListenableFuture<Integer>listenableFuture=executorService.submit(()->{//长时间的异步计算//Thread.sleep(3000);//然后返回结果return100;});Futures.addCallback(listenableFuture,newFutureCallback<Integer>(){@OverridepublicvoidonSuccess(Integerresult){System.out.println("getlistenablefuture'sresultwithcallback"+result);}@OverridepublicvoidonFailure(Throwablet){t.printStackTrace();}},executorService);});}

CompletableFuture

Futrue 对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。

在 Java 8 中,新增加了一个包含 50 个方法左右的类:CompletableFuture,提供了非常强大的 Future 的扩展功能。

CompletableFuture 能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。

CompletableFuture 弥补了 Future 模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过 thenAccept、thenApply、thenCompose 等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。

下面将会一个个的例子来说明 CompletableFuture

异步执行

/****publicstaticCompletableFuture<Void>runAsync(Runnablerunnable)*publicstaticCompletableFuture<Void>runAsync(Runnablerunnable,Executorexecutor)*publicstatic<U>CompletableFuture<U>supplyAsync(Supplier<U>supplier)*publicstatic<U>CompletableFuture<U>supplyAsync(Supplier<U>supplier,Executorexecutor)**以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。**runAsync方法也好理解,它以Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。**supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。*/publicstaticvoidrunAsyncExample()throwsExecutionException,InterruptedException{CompletableFuture<Void>cf=CompletableFuture.runAsync(()->{System.out.println("异常执行代码");});CompletableFuture<String>future=CompletableFuture.supplyAsync(()->{//长时间的计算任务return"·00";});System.out.println(future.join());}

计算结果完成时的处理

/****当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:**whenComplete(BiConsumer<?superT,?superThrowable>action)publicCompletableFuture<T>*whenCompleteAsync(BiConsumer<?superT,?superThrowable>action)publicCompletableFuture<T>*whenCompleteAsync(BiConsumer<?superT,?superThrowable>action,Executorexecutor)public*CompletableFuture<T>exceptionally(Function<Throwable,?extendsT>fn)**不以Async结尾的方法由原来的线程计算,以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行。*Java的CompletableFuture类总是遵循这样的原则**如果你希望不管CompletableFuture运行正常与否都执行一段代码,如释放资源,更新状态,记录日志等,但是同时不影响原来的执行结果。*那么你可以使用whenComplete方法。exceptionally非常类似于catch(),而whenComplete则非常类似于finally:*/publicstaticvoidwhenComplete()throwsExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(newSupplier<Integer>(){@OverridepublicIntegerget(){return2323;}});Future<Integer>f=future.whenComplete((v,e)->{System.out.println(v);System.out.println(e);});System.out.println(f.get());}

handle 是执行任务完成时对结果的处理

privatestaticclassHttpResponse{privatefinalintstatus;privatefinalStringbody;publicHttpResponse(finalintstatus,finalStringbody){this.status=status;this.body=body;}@OverridepublicStringtoString(){returnstatus+"-"+body;}}/***handle是执行任务完成时对结果的处理。handle方法和thenApply方法处理方式基本一样。不同的是handle是在任务完成后再执行,还可以处理异常的任务。*这组方法兼有whenComplete和转换的两个功能**public<U>CompletionStage<U>handle(BiFunction<?superT,Throwable,?extendsU>fn);*public<U>CompletionStage<U>handleAsync(BiFunction<?superT,Throwable,?extendsU>fn);*public<U>CompletionStage<U>handleAsync(BiFunction<?superT,Throwable,?extendsU>fn,Executorexecutor);**thenApply只可以执行正常的任务,任务出现异常则不执行thenApply方法。*public<U>CompletableFuture<U>thenApply(Function<?superT,?extendsU>fn)*public<U>CompletableFuture<U>thenApplyAsync(Function<?superT,?extendsU>fn)*public<U>CompletableFuture<U>thenApplyAsync(Function<?superT,?extendsU>fn,Executorexecutor)*/publicstaticvoidhandle()throwsExecutionException,InterruptedException{for(finalbooleanfailure:newboolean[]{false,true}){CompletableFuture<Integer>x=CompletableFuture.supplyAsync(()->{if(failure){thrownewRuntimeException("Oops,somethingwentwrong");}return42;});/***ReturnsanewCompletableFuturethat,whenthisCompletableFuturecompleteseithernormallyorexceptionally,*isexecutedwiththisstage'sresultandexceptionasargumentstothesuppliedfunction.*/CompletableFuture<HttpResponse>tryX=x//NotethattryXandxareofdifferenttype..handle((value,ex)->{if(value!=null){//Wegetachancetotransformtheresult...returnnewHttpResponse(200,value.toString());}else{//...orreturndetailsontheerrorusingtheExecutionException'smessage:returnnewHttpResponse(500,ex.getMessage());}});//Blocks(avoidthisinproductioncode!),andeitherreturnsthepromise'svalue:System.out.println(tryX.get());System.out.println("isCompletedExceptionally="+tryX.isCompletedExceptionally());}

转换

/***转换*@throwsExecutionException*@throwsInterruptedException*/publicstaticvoidthenApply()throwsExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<String>f=future.thenApplyAsync(i->i*10).thenApply(i->i.toString());//"1000"System.out.println(f.get());}

Action

/***上面的方法是当计算完成的时候,会生成新的计算结果(thenApply,handle),或者返回同样的计算结果whenComplete*CompletableFuture还提供了一种处理结果的方法,只对结果执行Action,而不返回新的计算值,因此计算值为Void:**publicCompletableFuture<Void>thenAccept(Consumer<?superT>action)*publicCompletableFuture<Void>thenAcceptAsync(Consumer<?superT>action)*publicCompletableFuture<Void>thenAcceptAsync(Consumer<?superT>action,Executorexecutor)*/publicstaticvoidaction()throwsExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<Void>f=future.thenAccept(System.out::println);System.out.println(f.get());}

thenAccept

/***thenAcceptBoth以及相关方法提供了类似的功能,当两个CompletionStage都正常完成计算的时候,就会执行提供的action,它用来组合另外一个异步的结果。*runAfterBoth是当两个CompletionStage都正常完成计算的时候,执行一个Runnable,这个Runnable并不使用计算的结果。**public<U>CompletableFuture<Void>thenAcceptBoth(CompletionStage<?extendsU>other,BiConsumer<?superT,?superU>action)*public<U>CompletableFuture<Void>thenAcceptBothAsync(CompletionStage<?extendsU>other,BiConsumer<?superT,?superU>action)*public<U>CompletableFuture<Void>thenAcceptBothAsync(CompletionStage<?extendsU>other,BiConsumer<?superT,?superU>action,Executorexecutor)*publicCompletableFuture<Void>runAfterBoth(CompletionStage<?>other,Runnableaction)*/publicstaticvoidthenAcceptBoth()throwsExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<Void>f=future.thenAcceptBoth(CompletableFuture.completedFuture(10),(x,y)->System.out.println(x*y));System.out.println(f.get());}

thenRun

/***当计算完成的时候会执行一个Runnable,与thenAccept不同,Runnable并不使用CompletableFuture计算的结果。**publicCompletableFuture<Void>thenRun(Runnableaction)*publicCompletableFuture<Void>thenRunAsync(Runnableaction)*publicCompletableFuture<Void>thenRunAsync(Runnableaction,Executorexecutor)*/publicstaticvoidthenRun()throwsExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<Void>f=future.thenRun(()->System.out.println("finished"));System.out.println(f.get());}

复合

/***thenCombine用来复合另外一个CompletionStage的结果。它的功能类似**A+*|*+------>C*+------^*B+**两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,other并不会等待先前的CompletableFuture执行完毕后再执行。**public<U,V>CompletableFuture<V>thenCombine(CompletionStage<?extendsU>other,BiFunction<?superT,?superU,?extendsV>fn)*public<U,V>CompletableFuture<V>thenCombineAsync(CompletionStage<?extendsU>other,BiFunction<?superT,?superU,?extendsV>fn)*public<U,V>CompletableFuture<V>thenCombineAsync(CompletionStage<?extendsU>other,BiFunction<?superT,?superU,?extendsV>fn,Executorexecutor)**其实从功能上来讲,它们的功能更类似thenAcceptBoth,只不过thenAcceptBoth是纯消费,它的函数参数没有返回值,而thenCombine的函数参数fn有返回值。*/publicstaticvoidthenCombine()throwsExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<String>future2=CompletableFuture.supplyAsync(()->{return"abc";});CompletableFuture<String>f=future.thenCombine(future2,(x,y)->y+"-"+x);System.out.println(f.get());//abc-100}

组合

publicstaticvoidfutureDemo2(){ListeningExecutorServiceexecutorService=MoreExecutors.listeningDecorator(CommonThreadPool.getPool());IntStream.rangeClosed(1,10).forEach(i->{ListenableFuture<Integer>listenableFuture=executorService.submit(()->{//长时间的异步计算//Thread.sleep(3000);//然后返回结果return100;});Futures.addCallback(listenableFuture,newFutureCallback<Integer>(){@OverridepublicvoidonSuccess(Integerresult){System.out.println("getlistenablefuture'sresultwithcallback"+result);}@OverridepublicvoidonFailure(Throwablet){t.printStackTrace();}},executorService);});}0

Either

publicstaticvoidfutureDemo2(){ListeningExecutorServiceexecutorService=MoreExecutors.listeningDecorator(CommonThreadPool.getPool());IntStream.rangeClosed(1,10).forEach(i->{ListenableFuture<Integer>listenableFuture=executorService.submit(()->{//长时间的异步计算//Thread.sleep(3000);//然后返回结果return100;});Futures.addCallback(listenableFuture,newFutureCallback<Integer>(){@OverridepublicvoidonSuccess(Integerresult){System.out.println("getlistenablefuture'sresultwithcallback"+result);}@OverridepublicvoidonFailure(Throwablet){t.printStackTrace();}},executorService);});}1

All

publicstaticvoidfutureDemo2(){ListeningExecutorServiceexecutorService=MoreExecutors.listeningDecorator(CommonThreadPool.getPool());IntStream.rangeClosed(1,10).forEach(i->{ListenableFuture<Integer>listenableFuture=executorService.submit(()->{//长时间的异步计算//Thread.sleep(3000);//然后返回结果return100;});Futures.addCallback(listenableFuture,newFutureCallback<Integer>(){@OverridepublicvoidonSuccess(Integerresult){System.out.println("getlistenablefuture'sresultwithcallback"+result);}@OverridepublicvoidonFailure(Throwablet){t.printStackTrace();}},executorService);});}2

allOf 如果其中一个失败了如何快速结束所有?

publicstaticvoidfutureDemo2(){ListeningExecutorServiceexecutorService=MoreExecutors.listeningDecorator(CommonThreadPool.getPool());IntStream.rangeClosed(1,10).forEach(i->{ListenableFuture<Integer>listenableFuture=executorService.submit(()->{//长时间的异步计算//Thread.sleep(3000);//然后返回结果return100;});Futures.addCallback(listenableFuture,newFutureCallback<Integer>(){@OverridepublicvoidonSuccess(Integerresult){System.out.println("getlistenablefuture'sresultwithcallback"+result);}@OverridepublicvoidonFailure(Throwablet){t.printStackTrace();}},executorService);});}3

我自己的一个 demo

publicstaticvoidfutureDemo2(){ListeningExecutorServiceexecutorService=MoreExecutors.listeningDecorator(CommonThreadPool.getPool());IntStream.rangeClosed(1,10).forEach(i->{ListenableFuture<Integer>listenableFuture=executorService.submit(()->{//长时间的异步计算//Thread.sleep(3000);//然后返回结果return100;});Futures.addCallback(listenableFuture,newFutureCallback<Integer>(){@OverridepublicvoidonSuccess(Integerresult){System.out.println("getlistenablefuture'sresultwithcallback"+result);}@OverridepublicvoidonFailure(Throwablet){t.printStackTrace();}},executorService);});}4


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/4930.html