waterStream 流式处理

Stream 是 Node.js 内置的数据处理机制(基于 EventEmitter 实现),用于以流方式逐块处理数据,而非一次性加载全部数据到内存中。在 Node.js 中,数据的产生、传输、处理都可以抽象为流,比如文件读写、网络请求响应、标准输入输出等场景,本质上都是流的操作。

解决的核心问题

JavaScript 与 Node.js 早期处理大数据(如大文件、大流量网络数据)时,若一次性将全部数据加载到内存,会带来两个核心问题:

  • 内存溢出:大文件(如几 GB 的视频、压缩包)一次性加载会耗尽 V8 堆内存(即使 Buffer 使用堆外内存,也会造成内存占用飙升)。

  • 性能低下:数据处理需要等待全部数据加载完成后才能开始,存在明显的等待延迟,且大内存数据的 GC 回收会带来额外性能开销。

3. 核心特性

  • 基于 EventEmitter:流的所有操作都通过事件驱动实现(如 data 事件接收数据、end 事件标记数据传输完成)。

  • 非阻塞 I/O:结合 Node.js 异步非阻塞特性,流操作不会阻塞主线程,保证应用的高并发能力。

  • 可组合性:通过「管道(pipe)」机制,支持多个流串联组合,实现复杂的数据处理链路(如「文件读取流 → 数据转换流 → 文件写入流」)。

  • 数据格式:默认处理的是 Buffer/UTF-8 字符串数据,可配置支持其他编码格式。

Readable Stream(可读流)

数据的生产者,用于读取数据并将数据输出到下游(如其他流,回调函数),不会主动推送数据,需处于「流动模式」或「暂停模式」才能输出数据。

import fs from "node:fs";

// 创建文件可读流
const readableStream = fs.createReadStream("./example.txt", {
  encoding: "utf-8", // 配置数据编码,默认返回 Buffer
  highWaterMark: 64 * 1024, // 每次读取的块大小(默认 64KB)
});

// 监听数据块
readableStream.on("data", chunk => {
  console.log(`读取到数据块,长度:${chunk.length}`);
  console.log(`数据库内容:${chunk.substring(0, 50)}...`);
});

// 监听读取完成
readableStream.on("end", () => {
  console.log("文件数据读取完成");
});

// 监听读取错误
readableStream.on("error", err => {
  console.error("文件读取失败:", err.message);
});

Writable Stream

数据的消费者,用于接收上游流传递的数据并写入到目标存储(文件、网络、终端等),不产生数据,仅负责消费数据。

核心特点:

  • 提供 write() 方法写入单个数据块,end() 方法标记写入完成(后续不可再写入数据)。

  • 背压机制:当可写流的缓冲区满时,会反馈给可读流,暂停数据推送,避免数据溢出。

Duplex Stream(双工流)

同时具备可读流和可写流的能力,两个流共享同一个实例,但内部有独立的缓冲区和操作逻辑,可读端和可写端可以独立工作。

net.Socket:TCP 套接字流(即可以读取客户端发送的数据,也可以向客户端写入数据)。

crypto.createCipheriv()/crypto.createDecipheriv():加密/解密双工流(写入明文数据,读取加密数据;或写入加密数据,读取明文数据)。

特点

可读端和可写端遵循个字的流协议(可读流的data/end事件,可写流的write/end方法)。

双工流的可读和可写式解耦的,写入的数据不会直接作为可读数据输出(与转换流的核心区别)。

Transform Stream(转换流)

特殊的双工流,具备可读端和可写端,但可读端的数据是由可写端的数据经过转换处理后生成的。

典型场景:

  • zlib.createGzip()/zlib.createGunzip():压缩/解压缩转换流(写入原始数据,读取 gzip 压缩数据。反之亦然)。

  • csv-parser:CSV 解析转换流(写入 CSV 文本数据,读取解析后的 JSON 数据)

特点:

继承自 Duplex 流,但其可读端的数据来源于可写端的转换结果。

提供 _transform() 方法(自定义转换流时需实现),用于定义数据转换逻辑。

Stream 的核心事件与方法

核心事件

  • data:当可读流处于「流动模式」,且有数据块可供读取时触发,回调参数为 chunk(数据块,Buffer / 字符串)。

  • end:可读流的所有数据已读取完毕,且没有更多数据可供读取时触发(仅在正常读取完成后触发,错误时不触发)。

  • pause:可读流被暂停时触发(调用 pause() 方法或背压机制触发暂停)。

  • resume:可读流被恢复为流动模式时触发(调用 resume() 方法)。

  • readable:可读流的缓冲区中有数据可供读取时触发(暂停模式下常用,需手动调用 read() 方法读取数据)。

核心方法

  • read([size]):暂停模式下,手动从缓冲区读取指定 size 字节的数据,返回数据块(无数据时返回 null)。

  • pause():将可读流从「流动模式」切换为「暂停模式」,停止触发 data 事件,数据会暂存到缓冲区。

  • resume():将可读流从「暂停模式」切换为「流动模式」,恢复触发 data 事件,继续推送数据。

  • pipe(destination):将可读流的数据通过「管道」传输到目标可写流 / 双工流 / 转换流(自动处理背压,推荐优先使用)。

  • unpipe([destination]):取消可读流与目标流的管道关联,停止数据传输。

可写流(Writable)特有事件与方法

核心事件

  • finish:调用 end() 方法后,所有数据已成功写入底层资源,且缓冲区已清空时触发(标记写入完成)。

  • drain:可写流的缓冲区从「满」状态变为「可写入」状态时触发(背压机制中,用于通知可读流恢复数据推送)。

  • pipe:当有可读流通过 pipe() 方法将数据传输到当前可写流时触发。

  • unpipe:当可读流通过 unpipe() 方法取消与当前可写流的管道关联时触发。

核心方法

  • write(chunk[, encoding][, callback]):向可写流写入一个数据块,返回布尔值:

    • true:缓冲区未满,可继续写入下一个数据块。

    • false:缓冲区已满,需等待 drain 事件触发后再写入(背压机制核心)。

  • end([chunk][, encoding][, callback]):标记可写流写入完成,可选写入最后一个数据块,触发 finish 事件,后续不可再调用 write()

  • cork():暂停数据写入到底层资源,将数据暂存到内部缓冲区(批量写入优化)。

  • uncork():恢复数据写入,将 cork() 暂存的缓冲区数据一次性写入底层资源。

管道(Pipe)与链式操作

pipe()方法是 Stream 最核心的组合方法,其核心价值在于:

  • 简化流之间的数据传输:无需手动监听 data 事件和调用 write() 方法,自动完成[可读流->可写流]的数据传输。

  • 自动处理背压(Backpressure):当可写流缓冲区满时,自动暂停可读流的推送;当可写流缓冲区可用时,自动恢复可读流的数据传输,避免数据丢失和内存溢出。

  • 自动错误传播:简化错误处理逻辑,减少手动监听 error 事件的冗余代码。

多流组合

多个流可以通过 pipe() 链式调用,组成复杂的数据处理链路,核心格式:可读流 → 转换流1 → 转换流2 → ... → 可写流

管道的取消

当需要提前终止流之间的管道传输时,可调用 unpipe() 方法。

背压机制详解

背压式流式数据处理中不可避免的问题,其核心产生原因是:数据的生成速度(可读流)大于数据的消费速度(可写流)。

具体流程:

  1. 可读流快速产生数据,通过 write() 方法推送给可写流。

  2. 可写流的缓冲区有固定容量,当写入的数据超过缓冲区容量时,缓冲区会被填满。

  3. 此时可写流无法及时将缓冲区的数据写入底层资源(如文件、网络),若可读流继续推送数据,会导致缓冲区溢出、数据丢失、内存飙升。

背压的自动处理(pipe() 内置支持)

pipe() 方法已内置背压处理逻辑,无需手动干预,其处理流程:

  1. 当可写流的 write() 方法返回 false(缓冲区满)时,pipe() 会自动调用可读流的 pause() 方法,暂停可读流的数据推送。

  2. 当可写流的缓冲区清空,触发 drain 事件时,pipe() 会自动调用可读流的 resume() 方法,恢复可读流的数据推送。

  3. 循环上述过程,直到所有数据处理完成,保证数据生产速度与消费速度匹配。

手动处理背压

若不使用 pipe() 方法,手动监听 data 事件写入数据,需手动处理背压。

Stream 的典型应用场景

大文件复制/迁移

HTTP 服务端响应大文件下载

当客户端请求下载大文件时,使用文件可读流直接通过 HTTP 响应流返回,无需加载全部文件到内存:

数据压缩与解压缩

终端输入输出交互

利用 process.stdin(可读流)和 process.stdout(可写流),实现终端的输入输出交互:

最后更新于