异步任务

什么是 CompletableFuture

在 Java 8 中引入了 CompletableFuture用于异步编程,异步通常意味着非阻塞,可以使我们的任务单独运行在与主线程分离的其他线程中,并通过回调可以在主线程中得到异步任务的执行状态,是否完成和异常信息等。

CompletableFuture除了提供更为好用和强大的Future特性之外,还提供了函数式编程、异步任务编排组合(将多个异步任务串联起来,组成一个完整的链式调用)等能力,提升了异步编程模型。

为什么要引入CompletableFuture

在一些业务场景中我们需要使用多线程异步执行任务,加快任务执行速度,所以JDK5新增了Future接口,用于描述一个异步计算的结果。

虽然Future提供了一步执行任务的能力,但是Future在实际使用过程中存在着一些局限性,比如不支持异步任务的编排组合、获取计算结果的get()方法为阻塞调用。或者通过轮询的方式判断Future.isDone任务是否结束,在获取结果。

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> {
    Thread.sleep(2000);
    return "hello world";
});
while (true) {
    if (future.isDone()) {
        System.out.println(future.get());
        break;
    }
}

CompletableFuture 使用

Drawing

线程串行化

其中thenApply是同步的,thenApplyAsync是异步的。

System.out.println("线程:" + Thread.currentThread().getName() + "===>主线程开始执行");
CompletableFuture<String> future = CompletableFuture.runAsync(() -> {
    System.out.println("线程:" + Thread.currentThread().getName() + "==>子线程开始执行---001");
}).thenApply(v -> {
    System.out.println("线程:" + Thread.currentThread().getName() + "==>子线程执行完毕---002");
    return true;
}).thenApply(s -> {
    System.out.println("线程:" + Thread.currentThread().getName() + "==>子线程执行完毕---003");
    return "hello world";
});
System.out.println("线程:" + Thread.currentThread().getName() + "==>获取异步任务的执行结果:" + future.join());

运行结果如下:

线程:main===>主线程开始执行
线程:ForkJoinPool.commonPool-worker-1==>子线程开始执行---001
线程:main==>子线程执行完毕---002
线程:main==>子线程执行完毕---003
线程:main==>获取异步任务的执行结果:hello world

如果将thenApply换成thenApplyAsync,任务将在子线程执行,执行结果如下:

线程:main===>主线程开始执行
线程:ForkJoinPool.commonPool-worker-1==>子线程开始执行---001
线程:ForkJoinPool.commonPool-worker-1==>子线程执行完毕---002
线程:ForkJoinPool.commonPool-worker-1==>子线程执行完毕---003
线程:main==>获取异步任务的执行结果:hello world

这两个方法的区别在于谁去执行这个任务,如果使用thenApplyAsync,那么执行的线程是从ForkJoinPool.commonPool()中获取不同的线程进行执行,如果使用thenApplyrunAsync方法执行速度特别快,那么thenApply任务就是主线程进行执行,如果执行特别慢的话就是和runAsync执行线程一样。

thenApplyhethenApplyAsync的区别:

System.out.println("=====================  sleep 5s  ========================");
System.out.println("线程:" + Thread.currentThread().getName() + "===>主线程开始执行");
CompletableFuture<String> future = CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    System.out.println("线程:" + Thread.currentThread().getName() + "==>runAsync执行完毕");
}).thenApply(v -> {
    System.out.println("线程:" + Thread.currentThread().getName() + "==>thenApply执行完毕---002");
    return true;
}).thenApply(s -> {
    System.out.println("线程:" + Thread.currentThread().getName() + "==>thenApply执行完毕---003");
    return "hello world";
});
System.out.println("线程:" + Thread.currentThread().getName() + "==>获取异步任务的执行结果:" + future.join());
System.out.println("=============================================");
System.out.println("=====================  no sleep  ========================");
System.out.println("线程:" + Thread.currentThread().getName() + "===>主线程开始执行");
CompletableFuture<String> future1 = CompletableFuture.runAsync(() -> {
    System.out.println("线程:" + Thread.currentThread().getName() + "==>runAsync执行完毕");
}).thenApply(v -> {
    System.out.println("线程:" + Thread.currentThread().getName() + "==>thenApply执行完毕---001");
    return true;
}).thenApply(s -> {
    System.out.println("线程:" + Thread.currentThread().getName() + "==>thenApply执行完毕---002");
    return "hello world";
});
System.out.println("线程:" + Thread.currentThread().getName() + "==>获取异步任务的执行结果:" + future1.join());
System.out.println("=============================================");

执行结果如下:

=====================  sleep 5s  ========================
线程:main===>主线程开始执行
线程:ForkJoinPool.commonPool-worker-1==>runAsync执行完毕
线程:ForkJoinPool.commonPool-worker-1==>thenApply执行完毕---001
线程:ForkJoinPool.commonPool-worker-1==>thenApply执行完毕---002
线程:main==>获取异步任务的执行结果:hello world
=============================================
=====================  no sleep  ========================
线程:main===>主线程开始执行
线程:ForkJoinPool.commonPool-worker-1==>runAsync执行完毕
线程:main==>thenApply执行完毕---001
线程:main==>thenApply执行完毕---002
线程:main==>获取异步任务的执行结果:hello world
=============================================

可以看到,如果runAsync方法执行速度慢的话,thenApply方法执行线程和runAsync执行线程相同,如果runAsync执行速度快的话,那么thenApply方法执行线程和Main方法执行线程相同

theApply:获取上一个任务的执行结果,并在当前任务的返回结果。
System.out.println("主线程开始执行");
CompletableFuture<String> future = CompletableFuture.runAsync(() -> {
    System.out.println("子线程开始执行---01");
}).thenApply(v -> {
    System.out.println("子线程执行完毕---02");
    return true;
}).thenApply(s -> {
    System.out.println("子线程执行完毕---03");
    return "hello world";
});
System.out.println("获取异步任务的执行结果:" + future.join());
System.out.println("主线程执行结束");
thenAccept:获取上一个任务的执行结果,并消费处理结果,无返回值。
System.out.println("主线程开始执行");
CompletableFuture.runAsync(() -> {
    System.out.println("子线程开始执行---01");
}).thenAccept(v -> {
    System.out.println("子线程执行完毕---02");
}).thenAccept(s -> {
    System.out.println("子线程执行完毕---03");
});
System.out.println("主线程执行结束");
thenRun:获取不到上个任务的执行结果,无返回值,只是按顺序执行。
System.out.println("主线程开始执行");
CompletableFuture.runAsync(() -> {
    System.out.println("子线程开始执行---01");
}).thenRun(() -> {
    System.out.println("子线程执行完毕---02");
}).thenRun(() -> {
    System.out.println("子线程执行完毕---03");
});
System.out.println("主线程执行结束");
Drawing
  • runAfterBoth:

System.out.println("主线程开始执行");
CompletableFuture<String> future1 = CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(4000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    System.out.println("future1---子线程开始执行---01");
}).thenApply(v -> {
    System.out.println("future1---子线程执行完毕---02");
    return "hello world";
});
CompletableFuture<String> future2 = CompletableFuture.runAsync(() -> {
    System.out.println("future2---子线程开始执行---01");
}).thenApply(v -> {
    System.out.println("future2---子线程执行完毕---02");
    return "hello world";
});
future1.runAfterBothAsync(future2, () -> {
    System.out.println("future1和future2都执行完毕了");
  • thenCombine

异常处理

CompletableFuture allOf 获取所有线程结果

https://blog.csdn.net/winterking3/article/details/116496121?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522169961985916800182742170%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=169961985916800182742170&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_ecpm_v1~rank_v31_ecpm-1-116496121-null-null.nonecase&utm_term=CompletableFuture&spm=1018.2226.3001.4450

whenComplete:

两个任务组合

多任务组合

https://mikechen.cc/15629.html

https://blog.csdn.net/li1325169021/article/details/126494107

最后更新于

这有帮助吗?