CompletableFuture
最后更新于
这有帮助吗?
在介绍CompletableFuture
之前,先简单介绍一下什么是同步和异步两个概念。例如我们去餐厅吃饭,一般都会经过如下几个步骤:点餐 > 服务员接单 > 等待饭菜 > 厨师做菜 > 服务员上菜 > 开始吃饭
。
如果使用同步编程,那么整个过程就是:
同步任务
如果使用异步,那么这个过程就可以划分为:
异步任务
同步就是按照任务的排列顺序执行,这种执行必须按照顺序执行,所以执行速度比较缓慢,而异步则不需要按照指定的顺序执行,它可以同时执行多个任务,等待异步任务完成,根据返回结果再执行特定的操作。
在 JDK8 之前的版本中,异步任务使用Future
实现,在使用Future
执行异步任务并且获得异步任务执行结果时,要么掉用阻塞的get()
方法,或者轮询调用isDone()
方法获取任务状态,判断是否为true
。这两种方法在执行时都会使主线程被迫等待。对性能产生一定影响,因此在 JDK8 版本中,新增了CompletableFuture
用于构建异步任务,该工具主要是对Future
的优化,通过使用观察者模式进行设计,实现异步回调进行异步编程,使开发者可以很方便的创建串行或并行任务,极大程度上简化了异步任务的编码。
runAsync
❌
从公共的commonPool线程中获取一个子线程,执行异步任务。并且该任务方法执行结束后。
supplyAsync
✅
从公共的commonPool线程池中获取一个子线程,执行制定的函数,并且该任务方法执行结束后。
任务处理方法一般指的是CompletableFuture中,用于以串行或并行的方式处理数据的静态方法或实例方法,使用这些方法可以实现任务的组合。
thenRun
❌
串行执行任务。并且该任务方法执行结束后,没有返回值。
thenRunAsync
❌
串行执行任务,从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。
thenApply
✅
串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
thenApplyAsync
✅
串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
thenAccept
❌
串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且当前任务方法执行结束后,没有返回值。
thenAcceptAsync
❌
串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。
handle
✅
串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
handleAsync
✅
串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。
whenComplete
❌
串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定函数。并且当前任务执行结束后,没有返回值。
whenCompleteAsync
❌
串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。
thenCompose
✅
串行执行任务,按顺序组合两个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。
thenComposeAsync
✅
串行执行任务,按顺序组合俩个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。
thenCombine
✅
并行执行任务,从 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。
thenCombineAsync
✅
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。
thenAcceptBoth
❌
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
thenAcceptBothAsync
❌
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
runAfterBoth
❌
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务方法执行结束后,没有返回值。
runAfterBothAsync
❌
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
applyToEither
❌
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
applyToEitherAsync
❌
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前先执行结束的任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
runAfterEither
❌
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
runAfterEitherAsync
❌
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。
acceptEither
❌
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
acceptEitherAsync
❌
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。
allOf
✅
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待全部任务方法都执行完成后结束。任务执行结束后,没有返回值。
anyOf
✅
并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待多个任务方法中任意一个执行完成后结束。任务执行结束后,返回第一个先执行完成任务的返回值。
任务结束指的是调用 CompletableFuture 中的实例方法,获取执行结果或者取消任务等,结束现有的任务链。
任务结束包含的方法如下:
get
✅
获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(运行时)异常。
join
✅
获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(未经检查)异常。
getNow
✅
立即获取任务执行结果,如果任务没有完成则返回设定的默认值,如果任务正常完成则返回执行结果。
cancel
✅
取消任务,如果任务尚未执行结束,调用该方法成功取消任务时返回 true,否则返回 false。并且任务取消成功后,通过 get/join 方法获取结果时,会抛出 CancellationException 异常。
查看任务状态方法用于查看 CompletableFuture 任务执行状态,其中包含的方法如下:
isDone
✅
查看任务是否执行完成,如果当前阶段执行完成(无论是正常完成还是异常完成)则返回 true,否则返回 false。
isCancelled
✅
查看当前阶段任务是否成功取消,如果此阶段任务在完成之前被取消则返回 true,否则返回 false。
isCompletedExceptionally
✅
查看当前阶段任务是否以异常的方式执行完成。比如取消任务、突然终止任务或者执行过程出现异常等,都属于异常方式执行完成,如果是以异常方式完成则返回 true,否则返回 false。
设置任务结果方法用于设置 CompletableFuture 任务结果,使其返回指定结果,其中包含的方法如下:
obtrudeValue
❌
设置(重置)调用 get/join 方法时返回指定值,无论任务是否执行完成。
obtrudeException
❌
设置(重置)调用 get/join 方法时返回指定异常,无论任务是否执行完成。
complete
✅
设置调用 get/join 方法时返回指定值。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。
completeException
✅
设置调用 get/join 方法时返回指定异常。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。
exceptionally
❌
判断上一个任务执行时是否发生异常,如果是则将异常作为当前方法参数,然后对其进行异常处理。
使用CompletableFuture执行异步任务时,会首先判断parallelism数量,该参数一般跟当前服务器CPU数量相关:
如果CPU数量大于1,CompletableFuture执行任务时,每次都会创建一个新的线程执行任务;
如果CPU数量大于1,CompletableFuture执行任务时,将使用公共的ForkJoinPool.commnPool线程池;
一般情况下,在多核CPU服务器中运行应用,都会默认使用ForkJoinPool.commonPool线程池,该线程池是基于Fork和Join组合实现的,执行过程中可以讲大的任务拆分为多个小任务并行执行,并且支持以窃取的方式,线程池中的线程在执行完自己工作队列中的任务后,可以窃取别的线程工作队列中没有执行完成的任务,协助其执行,尽可能使用并行的方式快速完成全部任务,所以ForkJoinPool.commonPool线程池更适合执行计算密集型任务,而不太适合IO密集型任务。
公共的ForkJoinPool.commonPool()
线程池是 JVM 进程中所有CompletableFuture 和 Stream 共享,如果全局上下文环境中存在大量使用 ForkJoinPool.commonPool()
线程池的任务,并且这些任务中包含大量的 IO 操作,那么该线程池性能将会受到很大影响。所以,一般情况下我们使用 CompletableFuture 时,需要避免使用公共线程池,而是使用自定义的线程池,并且设置不同的任务使用不同类型的线程池,以适用不同的任务场景。
在CompletableFuture中的方法,大部分方法都存在可以接收自定义线程池Executor参数的重载方法。因此,我们可以使用接收自定义线程池的CompletableFuture方法,将我们自定义的线程池作为参数,传入其中,使用该线程池中的线程执行任务。
CPU密集型任务也称为计算密集型任务,值的是任务执行过程中需要大量计算,没有阻塞,且消耗大量CPU资源。比如视频解码、类型转换等。
IO密集型任务,指的是需要进行大量磁盘IO读取,网络操作等,执行过程会造成堵塞,需要创建大量的线程执行任务。比如文件读写、网络请求等。
在Java中常用的线程池按照执行方法划分的话,可以划分为ForkJoinPool和ExecutorService两种。
ForkJoinPool在执行过程中需要消耗大量的CPU分解任务,然后进行计算,因此适合执行计算密集型任务。
ExecutorService是传统方式的线程池,使用池化管理线程,提前将若干个线程放入池中,当我们需要时就从池中获取线程执行任务,执行过程不会对任务进行拆解,并且使用完毕后不需要销毁而是放回池中,方便下次使用,从而减少创建和销毁线程的性能开销。所以这种线程适合进行大量IO操作的任务。
一般情况下,业务上使用的线程池都会设置线程池的线程大小,设置的线程过多会造成线程浪费,过少会造成任务堆积。最优解就是配置不同的线程数进行测试,然后判断应用设置线程池大小为多大性能最优。
这里可以参考网上的一套万能的推荐配置,跟操作系统的 CPU 数量相关,如下:
CPU 密集型任务: N + 1
IO 密集型任务: 2N + 1
还有一种针对 IO 密集型任务设置估算公式,按公式进行配置这种性能最优,公式如下:
估算公式: = (线程等待时间 + 线程 CPU 时间 / 线程 CPU 时间) * CPU 核心数
如果是既有 IO 操作的步骤,也有比较消耗 CPU 的步骤,这种混合型任务可以进行拆分,将不同的任务使用不同的线程池。