失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > 聊聊Node.js stream 模块 看看如何构建高性能的应用

聊聊Node.js stream 模块 看看如何构建高性能的应用

时间:2022-02-12 22:06:23

相关推荐

聊聊Node.js stream 模块 看看如何构建高性能的应用

web前端|js教程

Node.js,stream,Node应用

web前端-js教程

本篇文章带大家了解 Node stream 模块,介绍一下如何使用 Stream 构建高性能的 Node.js 应用,希望对大家有所帮助!

网页转发平台源码,哪能买到预装Ubuntu,玩命进食有毒爬虫,php postxml,医疗seo工资lzw

易语言源码改dll,Ubuntu进行c编程,在阿里云安装tomcat,hexo反爬虫,php书籍推荐 知乎,淘宝seo排名优化seo博客lzw

当你在键盘上输入字符,从磁盘读取文件或在网上下载文件时,一股信息流(bits)在流经不同的设备和应用。

qq农场外挂 源码,vscode浏览器怎样看,ubuntu更新yaml,tomcat启停脚本,网站植入爬虫,php如何获取内容,金华抖音搜索seo优化排名,选美网站模板lzw

如果你学会处理这些字节流,你将能构建高性能且有价值的应用。例如,试想一下当你在 YouTube 观看视频时,你不需要一直等待直到完整的视频下载完。一旦有一个小缓冲,视频就会开始播放,而剩下的会在你观看时继续下载。

Nodejs 包含一个内置模块stream可以让我们处理流数据。在这篇文章中,我们将通过几个简单的示例来讲解stream的用法,我们也会描述在面对复杂案例构建高性能应用时,应该如何构建管道去合并不同的流。

在我们深入理解应用构建前,理解 Node.jsstream模块提供的特性很重要。

让我们开始吧!

Node.js 流的类型

Node.jsstream提供了四种类型的流

可读流(Readable Streams)可写流(Writable Streams)双工流(Duplex Streams)转换流(Transform Streams)

让我们在高层面来看看每一种流类型吧。

可读流

可读流可以从一个特定的数据源中读取数据,最常见的是从一个文件系统中读取。Node.js 应用中其他常见的可读流用法有:

process.stdin-通过stdin在终端应用中读取用户输入。http.IncomingMessage– 在 HTTP 服务中读取传入的请求内容或者在 HTTP 客户端中读取服务器的 HTTP 响应。

可写流

你可以使用可写流将来自应用的数据写入到特定的地方,比如一个文件。

process.stdout可以用来将数据写成标准输出且被console.log内部使用。

接下来是双工流和转换流,可以被定义为基于可读流和可写流的混合流类型。

双工流

双工流是可读流和可写流的结合,它既可以将数据写入到特定的地方也可以从数据源读取数据。最常见的双工流案例是net.Socket,它被用来从 socket 读写数据。

有一点很重要,双工流中的可读端和可写端的操作是相互独立的,数据不会从一端流向另一端。

转换流

转换流与双工流略有相似,但在转换流中,可读端和可写端是相关联的。

crypto.Cipher类是一个很好的例子,它实现了加密流。通过crypto.Cipher流,应用可以往流的可写端写入纯文本数据并从流的可读端读取加密后的密文。之所以将这种类型的流称之为转换流就是因为其转换性质。

附注:另一个转换流是stream.PassThroughstream.PassThrough从可写端传递数据到可读端,没有任何转换。这听起来可能有点多余,但 Passthrough 流对构建自定义流以及流管道非常有帮助。(比如创建一个流的数据的多个副本)

从可读的 Node.js 流读取数据

一旦可读流连接到生产数据的源头,比如一个文件,就可以用几种方法通过该流读取数据。

首先,先创建一个名为myfile的简单的 text 文件,85 字节大小,包含以下字符串:

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur nec mauris turpis.

现在,我们看下从可读流读取数据的两种不同方式。

1. 监听data事件

从可读流读取数据的最常见方式是监听流发出的data事件。以下代码演示了这种方式:

const fs = require(fs)const readable = fs.createReadStream(./myfile, { highWaterMark: 20 });readable.on(data, (chunk) => { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`);})

highWaterMark属性作为一个选项传递给fs.createReadStream,用于决定该流中有多少数据缓冲。然后数据被冲到读取机制(在这个案例中,是我们的data处理程序)。默认情况下,可读fs流的highWaterMark值是 64kb。我们刻意重写该值为 20 字节用于触发多个data事件。

如果你运行上述程序,它会在五个迭代内从myfile中读取 85 个字节。你会在 console 看到以下输出:

Read 20 bytes"Lorem ipsum dolor si"Read 20 bytes"t amet, consectetur "Read 20 bytes"adipiscing elit. Cur"Read 20 bytes"abitur nec mauris tu"Read 5 bytes"rpis."

2. 使用异步迭代器

从可读流中读取数据的另一种方法是使用异步迭代器:

const fs = require(fs)const readable = fs.createReadStream(./myfile, { highWaterMark: 20 });(async () => { for await (const chunk of readable) { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); }})()

如果你运行这个程序,你会得到和前面例子一样的输出。

可读 Node.js 流的状态

当一个监听器监听到可读流的data事件时,流的状态会切换成”流动”状态(除非该流被显式的暂停了)。你可以通过流对象的readableFlowing属性检查流的”流动”状态

我们可以稍微修改下前面的例子,通过data处理器来示范:

const fs = require(fs)const readable = fs.createReadStream(./myfile, { highWaterMark: 20 });let bytesRead = 0console.log(`before attaching data handler. is flowing: ${readable.readableFlowing}`);readable.on(data, (chunk) => { console.log(`Read ${chunk.length} bytes`); bytesRead += chunk.length // 在从可读流中读取 60 个字节后停止阅读 if (bytesRead === 60) { readable.pause() console.log(`after pause() call. is flowing: ${readable.readableFlowing}`); // 在等待 1 秒后继续读取 setTimeout(() => { readable.resume() console.log(`after resume() call. is flowing: ${readable.readableFlowing}`); }, 1000) }})console.log(`after attaching data handler. is flowing: ${readable.readableFlowing}`);

在这个例子中,我们从一个可读流中读取myfile,但在读取 60 个字节后,我们临时暂停了数据流 1 秒。我们也在不同的时间打印了readableFlowing属性的值去理解他是如何变化的。

如果你运行上述程序,你会得到以下输出:

before attaching data handler. is flowing: nullafter attaching data handler. is flowing: trueRead 20 bytesRead 20 bytesRead 20 bytesafter pause() call. is flowing: falseafter resume() call. is flowing: trueRead 20 bytesRead 5 bytes

我们可以用以下来解释输出:

当我们的程序开始时,readableFlowing的值是null,因为我们没有提供任何消耗流的机制。

在连接到data处理器后,可读流变为“流动”模式,readableFlowing变为true

一旦读取 60 个字节,通过调用pause()来暂停流,readableFlowing也转变为false

在等待 1 秒后,通过调用resume(),流再次切换为“流动”模式,readableFlowing改为 `true’。然后剩下的文件内容在流中流动。

通过 Node.js 流处理大量数据

因为有流,应用不需要在内存中保留大型的二进制对象:小型的数据块可以接收到就进行处理。

在这部分,让我们组合不同的流来构建一个可以处理大量数据的真实应用。我们会使用一个小型的工具程序来生成一个给定文件的 SHA-256。

但首先,我们需要创建一个大型的 4GB 的假文件来测试。你可以通过一个简单的 shell 命令来完成:

On macOS:mkfile -n 4g 4gb_fileOn Linux:xfs_mkfile 4096m 4gb_file

在我们创建了假文件4gb_file后,让我们在不使用stream模块的情况下来生成来文件的 SHA-256 hash。

const fs = require("fs");const crypto = require("crypto");fs.readFile("./4gb_file", (readErr, data) => { if (readErr) return console.log(readErr) const hash = crypto.createHash("sha256").update(data).digest("base64"); fs.writeFile("./checksum.txt", hash, (writeErr) => { writeErr && console.error(err) });});

如果你运行以上代码,你可能会得到以下错误:

RangeError [ERR_FS_FILE_TOO_LARGE]: File size (4294967296) is greater than 2 GB at FSReqCallback.readFileAfterStat [as oncomplete] (fs.js:294:11) { code: ERR_FS_FILE_TOO_LARGE}

以上报错之所以发生是因为 JavaScript 运行时无法处理随机的大型缓冲。运行时可以处理的最大尺寸的缓冲取决于你的操作系统结构。你可以通过使用内建的buffer模块里的buffer.constants.MAX_LENGTH变量来查看你操作系统缓存的最大尺寸。

即使上述报错没有发生,在内存中保留大型文件也是有问题的。我们所拥有的可用的物理内存会限制我们应用能使用的内存量。高内存使用率也会造成应用在 CPU 使用方面性能低下,因为垃圾回收会变得昂贵。

使用pipeline()减少 APP 的内存占用

现在,让我们看看如何修改应用去使用流且避免遇到这个报错:

const fs = require("fs");const crypto = require("crypto");const { pipeline } = require("stream");const hashStream = crypto.createHash("sha256");hashStream.setEncoding(ase64)const inputStream = fs.createReadStream("./4gb_file");const outputStream = fs.createWriteStream("./checksum.txt");pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) })

在这个例子中,我们使用crypto.createHash函数提供的流式方法。它返回一个“转换”流对象hashStream,为随机的大型文件生成 hash。

为了将文件内容传输到这个转换流中,我们使用fs.createReadStream4gb_file创建了一个可读流inputStream。我们将hashStream转换流的输出传递到可写流outputStream中,而checksum.txt通过fs.createWriteStream创建的。

如果你运行以上程序,你将看见在checksum.txt文件中看见 4GB 文件的 SHA-256 hash。

对流使用pipeline()pipe()的对比

在前面的案例中,我们使用pipeline函数来连接多个流。另一种常见的方法是使用.pipe()函数,如下所示:

inputStream .pipe(hashStream) .pipe(outputStream)

但这里有几个原因,所以并不推荐在生产应用中使用.pipe()。如果其中一个流被关闭或者出现报错,pipe()不会自动销毁连接的流,这会导致应用内存泄露。同样的,pipe()不会自动跨流转发错误到一个地方处理。

因为这些问题,所以就有了pipeline(),所以推荐你使用pipeline()而不是pipe()来连接不同的流。 我们可以重写上述的pipe()例子来使用pipeline()函数,如下:

pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) })

pipeline()接受一个回调函数作为最后一个参数。任何来自被连接的流的报错都将触发该回调函数,所以可以很轻松的在一个地方处理报错。

总结:使用 Node.js 流降低内存并提高性能

在 Node.js 中使用流有助于我们构建可以处理大型数据的高性能应用。

在这篇文章中,我们覆盖了:

四种类型的 Node.js 流(可读流、可写流、双工流以及转换流)。如何通过监听data事件或者使用异步迭代器来从可读流中读取数据。通过使用pipeline连接多个流来减少内存占用。

一个简短的警告:你很可能不会遇到太多必须使用流的场景,而基于流的方案会提高你的应用的复杂性。务必确保使用流的好处胜于它所带来的复杂性。

nodejs 教学!

如果觉得《聊聊Node.js stream 模块 看看如何构建高性能的应用》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。