什么是 CompletableFuture
在 Java 8 中引入了 CompletableFuture
用于异步编程,异步通常意味着非阻塞,可以使我们的任务单独运行在与主线程分离的其他线程中,并通过回调可以在主线程中得到异步任务的执行状态,是否完成和异常信息等。
CompletableFuture
除了提供更为好用和强大的Future
特性之外,还提供了函数式编程、异步任务编排组合(将多个异步任务串联起来,组成一个完整的链式调用)等能力,提升了异步编程模型。
为什么要引入CompletableFuture
在一些业务场景中我们需要使用多线程异步执行任务,加快任务执行速度,所以JDK5新增了Future接口,用于描述一个异步计算的结果。
虽然Future
提供了一步执行任务的能力,但是Future
在实际使用过程中存在着一些局限性,比如不支持异步任务的编排组合、获取计算结果的get()
方法为阻塞调用。或者通过轮询的方式判断Future.isDone
任务是否结束,在获取结果。
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> {
Thread.sleep(2000);
return "hello world";
});
while (true) {
if (future.isDone()) {
System.out.println(future.get());
break;
}
}
CompletableFuture 使用
线程串行化
其中thenApply
是同步的,thenApplyAsync
是异步的。
System.out.println("线程:" + Thread.currentThread().getName() + "===>主线程开始执行");
CompletableFuture<String> future = CompletableFuture.runAsync(() -> {
System.out.println("线程:" + Thread.currentThread().getName() + "==>子线程开始执行---001");
}).thenApply(v -> {
System.out.println("线程:" + Thread.currentThread().getName() + "==>子线程执行完毕---002");
return true;
}).thenApply(s -> {
System.out.println("线程:" + Thread.currentThread().getName() + "==>子线程执行完毕---003");
return "hello world";
});
System.out.println("线程:" + Thread.currentThread().getName() + "==>获取异步任务的执行结果:" + future.join());
运行结果如下:
线程:main===>主线程开始执行
线程:ForkJoinPool.commonPool-worker-1==>子线程开始执行---001
线程:main==>子线程执行完毕---002
线程:main==>子线程执行完毕---003
线程:main==>获取异步任务的执行结果:hello world
线程:main===>主线程开始执行
线程:ForkJoinPool.commonPool-worker-1==>子线程开始执行---001
线程:ForkJoinPool.commonPool-worker-1==>子线程执行完毕---002
线程:ForkJoinPool.commonPool-worker-1==>子线程执行完毕---003
线程:main==>获取异步任务的执行结果:hello world
如果将thenApply
换成thenApplyAsync
,任务将在子线程执行,执行结果如下:
线程:main===>主线程开始执行
线程:ForkJoinPool.commonPool-worker-1==>子线程开始执行---001
线程:ForkJoinPool.commonPool-worker-1==>子线程执行完毕---002
线程:ForkJoinPool.commonPool-worker-1==>子线程执行完毕---003
线程:main==>获取异步任务的执行结果:hello world
这两个方法的区别在于谁去执行这个任务,如果使用thenApplyAsync
,那么执行的线程是从ForkJoinPool.commonPool()
中获取不同的线程进行执行,如果使用thenApply
,runAsync
方法执行速度特别快,那么thenApply
任务就是主线程进行执行,如果执行特别慢的话就是和runAsync
执行线程一样。
thenApplyhe
和thenApplyAsync
的区别:
System.out.println("===================== sleep 5s ========================");
System.out.println("线程:" + Thread.currentThread().getName() + "===>主线程开始执行");
CompletableFuture<String> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("线程:" + Thread.currentThread().getName() + "==>runAsync执行完毕");
}).thenApply(v -> {
System.out.println("线程:" + Thread.currentThread().getName() + "==>thenApply执行完毕---002");
return true;
}).thenApply(s -> {
System.out.println("线程:" + Thread.currentThread().getName() + "==>thenApply执行完毕---003");
return "hello world";
});
System.out.println("线程:" + Thread.currentThread().getName() + "==>获取异步任务的执行结果:" + future.join());
System.out.println("=============================================");
System.out.println("===================== no sleep ========================");
System.out.println("线程:" + Thread.currentThread().getName() + "===>主线程开始执行");
CompletableFuture<String> future1 = CompletableFuture.runAsync(() -> {
System.out.println("线程:" + Thread.currentThread().getName() + "==>runAsync执行完毕");
}).thenApply(v -> {
System.out.println("线程:" + Thread.currentThread().getName() + "==>thenApply执行完毕---001");
return true;
}).thenApply(s -> {
System.out.println("线程:" + Thread.currentThread().getName() + "==>thenApply执行完毕---002");
return "hello world";
});
System.out.println("线程:" + Thread.currentThread().getName() + "==>获取异步任务的执行结果:" + future1.join());
System.out.println("=============================================");
执行结果如下:
===================== sleep 5s ========================
线程:main===>主线程开始执行
线程:ForkJoinPool.commonPool-worker-1==>runAsync执行完毕
线程:ForkJoinPool.commonPool-worker-1==>thenApply执行完毕---001
线程:ForkJoinPool.commonPool-worker-1==>thenApply执行完毕---002
线程:main==>获取异步任务的执行结果:hello world
=============================================
===================== no sleep ========================
线程:main===>主线程开始执行
线程:ForkJoinPool.commonPool-worker-1==>runAsync执行完毕
线程:main==>thenApply执行完毕---001
线程:main==>thenApply执行完毕---002
线程:main==>获取异步任务的执行结果:hello world
=============================================
可以看到,如果runAsync
方法执行速度慢的话,thenApply
方法执行线程和runAsync
执行线程相同,如果runAsync
执行速度快的话,那么thenApply
方法执行线程和Main
方法执行线程相同
theApply:获取上一个任务的执行结果,并在当前任务的返回结果。
System.out.println("主线程开始执行");
CompletableFuture<String> future = CompletableFuture.runAsync(() -> {
System.out.println("子线程开始执行---01");
}).thenApply(v -> {
System.out.println("子线程执行完毕---02");
return true;
}).thenApply(s -> {
System.out.println("子线程执行完毕---03");
return "hello world";
});
System.out.println("获取异步任务的执行结果:" + future.join());
System.out.println("主线程执行结束");
thenAccept:获取上一个任务的执行结果,并消费处理结果,无返回值。
System.out.println("主线程开始执行");
CompletableFuture.runAsync(() -> {
System.out.println("子线程开始执行---01");
}).thenAccept(v -> {
System.out.println("子线程执行完毕---02");
}).thenAccept(s -> {
System.out.println("子线程执行完毕---03");
});
System.out.println("主线程执行结束");
thenRun:获取不到上个任务的执行结果,无返回值,只是按顺序执行。
System.out.println("主线程开始执行");
CompletableFuture.runAsync(() -> {
System.out.println("子线程开始执行---01");
}).thenRun(() -> {
System.out.println("子线程执行完毕---02");
}).thenRun(() -> {
System.out.println("子线程执行完毕---03");
});
System.out.println("主线程执行结束");
System.out.println("主线程开始执行");
CompletableFuture<String> future1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("future1---子线程开始执行---01");
}).thenApply(v -> {
System.out.println("future1---子线程执行完毕---02");
return "hello world";
});
CompletableFuture<String> future2 = CompletableFuture.runAsync(() -> {
System.out.println("future2---子线程开始执行---01");
}).thenApply(v -> {
System.out.println("future2---子线程执行完毕---02");
return "hello world";
});
future1.runAfterBothAsync(future2, () -> {
System.out.println("future1和future2都执行完毕了");
异常处理
CompletableFuture allOf 获取所有线程结果
whenComplete:
两个任务组合
多任务组合