Java
Python前端运维数据库
Java
Java
  • 新特性
    • Record
    • Optional
  • 面向对象
    • 面向对象基础
    • 构造方法
    • 继承与多态
    • 接口
    • 修饰符
    • 代码块
    • 接口(Interface)
    • 枚举类
  • IO流
    • IO
      • 字节流
      • 字符流
      • 缓冲流
      • 转换流
      • 操作ZIP
      • File 对象
    • NIO
      • Channel和Buffer
      • 异步文件通道AsynchronousFileChannel
      • Selector
      • Path/Files/Pipe
  • 反射
  • 内存分配
  • 集合
    • 简介
    • List
    • Set
    • Map
    • EnumMap
  • 日期与时间
    • Date和Calendar
    • Java8 新时间 ✨
      • LocalDateTime
      • ZonedDateTime
      • Duration
    • 时间格式化
      • SimpleDateFromat
      • DateTimeFormatter ✨
    • Instant
    • 实践
  • 网络编程
    • IP 地址
    • 网络模型
    • TCP 编程
    • UDP 编程
    • HTTP 编程
  • 加密和安全
  • 并发编程
    • 多线程
    • 线程与进程的区别
    • 线程组和线程优先级
    • 线程池
    • 线程锁
  • 异步任务
    • Future
    • CompletableFuture
      • 开启异步任务
      • 串行任务方法
      • 并行任务方法
      • 任务结束方法
      • 异常处理方法
      • 查看状态方法
      • 设置任务结果方法
  • 执行系统命令
  • Stream 流
    • Stream 流的创建
    • Stream 流串行与并行
    • Stream 流中间操作
    • Stream 流终端操作
  • Lambda 表达式
    • Lambda 表达式简介
    • Lambda 表达式语法
    • 方法引用
  • String
  • StringBuffer
由 GitBook 提供支持
在本页
  • 什么是同步和异步
  • 简介
  • CompletableFuture 中的分类
  • 任务开启方法
  • 任务处理方法
  • 任务结束方法
  • 查看任务状态方法
  • 设置任务结果方法
  • 任务异常处理方法
  • CompletableFuture中的线程池
  • 默认线程池
  • 自定义线程池
  • CPU密集型任务和IO密集型任务
  • 不同类型任务对线程池的选择
  • 线程大小配置推荐

这有帮助吗?

  1. 异步任务

CompletableFuture

上一页Future下一页开启异步任务

最后更新于1年前

这有帮助吗?

什么是同步和异步

在介绍CompletableFuture 之前,先简单介绍一下什么是同步和异步两个概念。例如我们去餐厅吃饭,一般都会经过如下几个步骤:点餐 > 服务员接单 > 等待饭菜 > 厨师做菜 > 服务员上菜 > 开始吃饭。

如果使用同步编程,那么整个过程就是:

img

同步任务

如果使用异步,那么这个过程就可以划分为:

异步任务

同步就是按照任务的排列顺序执行,这种执行必须按照顺序执行,所以执行速度比较缓慢,而异步则不需要按照指定的顺序执行,它可以同时执行多个任务,等待异步任务完成,根据返回结果再执行特定的操作。

简介

在 JDK8 之前的版本中,异步任务使用Future实现,在使用Future执行异步任务并且获得异步任务执行结果时,要么掉用阻塞的get() 方法,或者轮询调用isDone()方法获取任务状态,判断是否为true。这两种方法在执行时都会使主线程被迫等待。对性能产生一定影响,因此在 JDK8 版本中,新增了CompletableFuture用于构建异步任务,该工具主要是对Future 的优化,通过使用观察者模式进行设计,实现异步回调进行异步编程,使开发者可以很方便的创建串行或并行任务,极大程度上简化了异步任务的编码。

CompletableFuture 中的分类

任务开启方法

返回值
描述

runAsync

❌

从公共的commonPool线程中获取一个子线程,执行异步任务。并且该任务方法执行结束后。

supplyAsync

✅

从公共的commonPool线程池中获取一个子线程,执行制定的函数,并且该任务方法执行结束后。

任务处理方法

任务处理方法一般指的是CompletableFuture中,用于以串行或并行的方式处理数据的静态方法或实例方法,使用这些方法可以实现任务的组合。

串行任务

方法名称
有返回值
描述

thenRun

❌

串行执行任务。并且该任务方法执行结束后,没有返回值。

thenRunAsync

❌

串行执行任务,从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。

thenApply

✅

串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。

thenApplyAsync

✅

串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。

thenAccept

❌

串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且当前任务方法执行结束后,没有返回值。

thenAcceptAsync

❌

串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。

handle

✅

串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。

handleAsync

✅

串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。

whenComplete

❌

串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定函数。并且当前任务执行结束后,没有返回值。

whenCompleteAsync

❌

串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。

thenCompose

✅

串行执行任务,按顺序组合两个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。

thenComposeAsync

✅

串行执行任务,按顺序组合俩个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。

并行任务

方法名称
有返回值
描述

thenCombine

✅

并行执行任务,从 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。

thenCombineAsync

✅

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。

thenAcceptBoth

❌

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。

thenAcceptBothAsync

❌

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。

runAfterBoth

❌

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务方法执行结束后,没有返回值。

runAfterBothAsync

❌

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。

applyToEither

❌

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。

applyToEitherAsync

❌

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前先执行结束的任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。

runAfterEither

❌

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。

runAfterEitherAsync

❌

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。

acceptEither

❌

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。

acceptEitherAsync

❌

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。

allOf

✅

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待全部任务方法都执行完成后结束。任务执行结束后,没有返回值。

anyOf

✅

并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待多个任务方法中任意一个执行完成后结束。任务执行结束后,返回第一个先执行完成任务的返回值。

任务结束方法

任务结束指的是调用 CompletableFuture 中的实例方法,获取执行结果或者取消任务等,结束现有的任务链。

任务结束包含的方法如下:

方法名称
有返回值
描述

get

✅

获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(运行时)异常。

join

✅

获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(未经检查)异常。

getNow

✅

立即获取任务执行结果,如果任务没有完成则返回设定的默认值,如果任务正常完成则返回执行结果。

cancel

✅

取消任务,如果任务尚未执行结束,调用该方法成功取消任务时返回 true,否则返回 false。并且任务取消成功后,通过 get/join 方法获取结果时,会抛出 CancellationException 异常。

查看任务状态方法

查看任务状态方法用于查看 CompletableFuture 任务执行状态,其中包含的方法如下:

方法名称
有返回值
描述

isDone

✅

查看任务是否执行完成,如果当前阶段执行完成(无论是正常完成还是异常完成)则返回 true,否则返回 false。

isCancelled

✅

查看当前阶段任务是否成功取消,如果此阶段任务在完成之前被取消则返回 true,否则返回 false。

isCompletedExceptionally

✅

查看当前阶段任务是否以异常的方式执行完成。比如取消任务、突然终止任务或者执行过程出现异常等,都属于异常方式执行完成,如果是以异常方式完成则返回 true,否则返回 false。

设置任务结果方法

设置任务结果方法用于设置 CompletableFuture 任务结果,使其返回指定结果,其中包含的方法如下:

方法名称
有返回值
描述

obtrudeValue

❌

设置(重置)调用 get/join 方法时返回指定值,无论任务是否执行完成。

obtrudeException

❌

设置(重置)调用 get/join 方法时返回指定异常,无论任务是否执行完成。

complete

✅

设置调用 get/join 方法时返回指定值。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。

completeException

✅

设置调用 get/join 方法时返回指定异常。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。

任务异常处理方法

方法名称
返回值
描述

exceptionally

❌

判断上一个任务执行时是否发生异常,如果是则将异常作为当前方法参数,然后对其进行异常处理。

CompletableFuture中的线程池

默认线程池

使用CompletableFuture执行异步任务时,会首先判断parallelism数量,该参数一般跟当前服务器CPU数量相关:

  • 如果CPU数量大于1,CompletableFuture执行任务时,每次都会创建一个新的线程执行任务;

  • 如果CPU数量大于1,CompletableFuture执行任务时,将使用公共的ForkJoinPool.commnPool线程池;

一般情况下,在多核CPU服务器中运行应用,都会默认使用ForkJoinPool.commonPool线程池,该线程池是基于Fork和Join组合实现的,执行过程中可以讲大的任务拆分为多个小任务并行执行,并且支持以窃取的方式,线程池中的线程在执行完自己工作队列中的任务后,可以窃取别的线程工作队列中没有执行完成的任务,协助其执行,尽可能使用并行的方式快速完成全部任务,所以ForkJoinPool.commonPool线程池更适合执行计算密集型任务,而不太适合IO密集型任务。

公共的ForkJoinPool.commonPool() 线程池是 JVM 进程中所有CompletableFuture 和 Stream 共享,如果全局上下文环境中存在大量使用 ForkJoinPool.commonPool() 线程池的任务,并且这些任务中包含大量的 IO 操作,那么该线程池性能将会受到很大影响。所以,一般情况下我们使用 CompletableFuture 时,需要避免使用公共线程池,而是使用自定义的线程池,并且设置不同的任务使用不同类型的线程池,以适用不同的任务场景。

自定义线程池

在CompletableFuture中的方法,大部分方法都存在可以接收自定义线程池Executor参数的重载方法。因此,我们可以使用接收自定义线程池的CompletableFuture方法,将我们自定义的线程池作为参数,传入其中,使用该线程池中的线程执行任务。

public class Main {
    public static ThreadPoolExecutor threadPoolExecutor() {
        // 工作队列
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(200);
        // 拒绝策略
        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
        // 核心线程数10,最大线程数20,空闲线程存活时间60秒,工作队列200,拒绝策略AbortPolicy
        return new ThreadPoolExecutor(10, 20, 60L, TimeUnit.SECONDS, workQueue, handler);
    }

    public static void main(String[] args) {
        ThreadPoolExecutor executor = threadPoolExecutor();
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello, CompletableFuture", executor);
        // 掉用join方法等待异步任务执行完成
        System.out.println(future.join());
    }
}

CPU密集型任务和IO密集型任务

CPU密集型任务也称为计算密集型任务,值的是任务执行过程中需要大量计算,没有阻塞,且消耗大量CPU资源。比如视频解码、类型转换等。

IO密集型任务,指的是需要进行大量磁盘IO读取,网络操作等,执行过程会造成堵塞,需要创建大量的线程执行任务。比如文件读写、网络请求等。

不同类型任务对线程池的选择

在Java中常用的线程池按照执行方法划分的话,可以划分为ForkJoinPool和ExecutorService两种。

ForkJoinPool在执行过程中需要消耗大量的CPU分解任务,然后进行计算,因此适合执行计算密集型任务。

ExecutorService是传统方式的线程池,使用池化管理线程,提前将若干个线程放入池中,当我们需要时就从池中获取线程执行任务,执行过程不会对任务进行拆解,并且使用完毕后不需要销毁而是放回池中,方便下次使用,从而减少创建和销毁线程的性能开销。所以这种线程适合进行大量IO操作的任务。

线程大小配置推荐

一般情况下,业务上使用的线程池都会设置线程池的线程大小,设置的线程过多会造成线程浪费,过少会造成任务堆积。最优解就是配置不同的线程数进行测试,然后判断应用设置线程池大小为多大性能最优。

这里可以参考网上的一套万能的推荐配置,跟操作系统的 CPU 数量相关,如下:

  • CPU 密集型任务: N + 1

  • IO 密集型任务: 2N + 1

还有一种针对 IO 密集型任务设置估算公式,按公式进行配置这种性能最优,公式如下:

  • 估算公式: = (线程等待时间 + 线程 CPU 时间 / 线程 CPU 时间) * CPU 核心数

如果是既有 IO 操作的步骤,也有比较消耗 CPU 的步骤,这种混合型任务可以进行拆分,将不同的任务使用不同的线程池。

img