Stream 流串行与并行

Stream 串行操作

Stream 默认使用串行操作,串行操作通过单线程执行。在串行流中,元素是顺序处理的,一个元素处理完成后才处理下一个元素。

List<String> list = Arrays.asList("a", "b", "c", "d", "e");
Stream<String> stream = list.stream();
stream.forEach(System.out::println);

Stream 并行操作

在 Stream 中,最重要的一个概念就是并行操作。Stream API 可以声明性地通过 parallel() 来将串行流转换为并行流。 或者使用 parallelStream() 方法直接创建并行流。

List<String> list = Arrays.asList("a", "b", "c", "d", "e");
// 串行流转并行流
Stream<String> stream = list.stream().parallel();
List<String> list = Arrays.asList("a", "b", "c", "d", "e");
// 直接创建并行流
Stream<String> stream = list.parallelStream();

在并行流中,元素会被分成多个数据块,然后在不同的线程中分别处理每个数据块。在多核处理器上,可以显著提升性能。

并行流的操作是无序的,元素的处理顺序是不确定的。如果需要保证元素的顺序,可以使用 forEachOrdered() 方法。

List<String> list = Arrays.asList("a", "b", "c", "d", "e");
list.parallelStream().forEachOrdered(System.out::println);
// 输出:a b c d e

在底层实现上,Stream API 使用了 Fork/Join 框架来拆分任务和加速处理过程。Fork/Join 框架是 Java 7 提供的一个用于并行执行任务的框架,主要用于递归任务的并行处理。Fork/Join 框架的核心是工作窃取算法,该算法可以让空闲的线程从其他线程的任务队列中窃取任务来执行。

示例

处理 1w 条数据的集合,分别使用串行流和并行流来计算集合中所有元素的总和。对比两者的性能差异。

// 串行流
long start = System.currentTimeMillis();
int sum = IntStream.range(0, 10000).reduce(0, Integer::sum);
long end = System.currentTimeMillis();
System.out.println("串行流计算结果:" + sum + ",耗时:" + (end - start) + "ms");

// 并行流
start = System.currentTimeMillis();
sum = IntStream.range(0, 10000).parallel().reduce(0, Integer::sum);
end = System.currentTimeMillis();
System.out.println("并行流计算结果:" + sum + ",耗时:" + (end - start) + "ms");
// 串行流计算结果:49995000,耗时:1ms
// 并行流计算结果:49995000,耗时:3ms

通过上述的示例,我们可以看到并行操作的流耗时反而更长,这是因为并行流的线程切换和线程池的创建销毁等操作耗时较长,适用于大数据量的计算。当我们的数据量较小时,串行流的性能更好。因此在使用并行流时,需要根据实际情况来选择。

当我们增加数据量到 10 亿条数据时,再次对比两者的性能差异。

// 串行流
long start = System.currentTimeMillis();
int sum = IntStream.range(0, 1000000000).reduce(0, Integer::sum);
long end = System.currentTimeMillis();
System.out.println("串行流计算结果:" + sum + ",耗时:" + (end - start) + "ms");
// 并行流
start = System.currentTimeMillis();
sum = IntStream.range(0, 1000000000).parallel().reduce(0, Integer::sum);
end = System.currentTimeMillis();
System.out.println("并行流计算结果:" + sum + ",耗时:" + (end - start) + "ms");
// 串行流计算结果:49995000,耗时:349ms
// 并行流计算结果:49995000,耗时:68ms

通过上述示例,我们可以看到并行流的性能优势,因此当数据量较大时,使用并行流能更好的提高程序的性能。

最后更新于

这有帮助吗?