Node.js Readable Stream的实现简析

时间:2020-09-18 19:17:17


Node.js Readable Stream的实现简析



Readable Stream是对数据源的一种抽象。它提供了从数据源获取数据并缓存,以及将数据提供给数据消费者的能力。

接下来分别通过Readable Stream的2种模式来学习下可读流是如何获取数据以及将数据提供给消费者的。




const { Readable } = require('stream')let c = 97 - 1// 实例化一个可读流const rs = new Readable({read () {if (c >= 'z'.charCodeAt(0)) return rs.push(null)setTimeout(() => {// 向可读流中推送数据rs.push(String.fromCharCode(++c))}, 100)}})// 将可读流的数据pipe到标准输出并打印出来rs.pipe(process.stdout)process.on('exit', () => {console.error('\n_read() called ' + (c - 97) + ' times')})复制代码


function Readable(options) {if (!(this instanceof Readable))return new Readable(options);// _readableState里面保存了关于可读流的不同阶段的状态值,下面会具体的分析this._readableState = new ReadableState(options, this);// legacythis.readable = true;if (options) {// 重写内部的_read方法,用以自定义从数据源获取数据if (typeof options.read === 'function')this._read = options.read;if (typeof options.destroy === 'function')// 重写内部的_destory方法this._destroy = options.destroy;}Stream.call(this);}复制代码


function ReadableState(options, stream) {options = options || {};...// object stream flag. Used to make read(n) ignore n and to// make all the buffer merging and length checks go away// 是否为对象模式,如果是的话,那么从缓冲区获得的数据为对象this.objectMode = !!options.objectMode;if (isDuplex)this.objectMode = this.objectMode || !!options.readableObjectMode;// the point at which it stops calling _read() to fill the buffer// Note: 0 is a valid value, means "don't call _read preemptively ever"// 高水位线,一旦buffer缓冲区的数据量大于hwm时,就会停止调用从数据源再获取数据var hwm = options.highWaterMark;var readableHwm = options.readableHighWaterMark;var defaultHwm = this.objectMode ? 16 : 16 * 1024; // 默认值if (hwm || hwm === 0)this.highWaterMark = hwm;else if (isDuplex && (readableHwm || readableHwm === 0))this.highWaterMark = readableHwm;elsethis.highWaterMark = defaultHwm;// cast to ints.this.highWaterMark = Math.floor(this.highWaterMark);// A linked list is used to store data chunks instead of an array because the// linked list can remove elements from the beginning faster than// array.shift()// readable可读流内部的缓冲区this.buffer = new BufferList();// 缓冲区数据长度this.length = 0;this.pipes = null;this.pipesCount = 0;// flowing模式的初始值this.flowing = null;// 是否已将源数据全部读取完毕this.ended = false;// 是否触发了end事件this.endEmitted = false;// 是否正在从源数据处读取数据到缓冲区this.reading = false;// a flag to be able to tell if the event 'readable'/'data' is emitted// immediately, or on a later tick. We set this to true at first, because// any actions that shouldn't happen until "later" should generally also// not happen before the first read call.this.sync = true;// whenever we return null, then we set a flag to say// that we're awaiting a 'readable' event emission.this.needReadable = false;this.emittedReadable = false;this.readableListening = false;this.resumeScheduled = false;// has it been destroyedthis.destroyed = false;// Crypto is kind of old and crusty. Historically, its default string// encoding is 'binary' so we have to make this configurable.// Everything else in the universe uses 'utf8', though.// 编码方式this.defaultEncoding = options.defaultEncoding || 'utf8';// 在pipe管道当中正在等待drain事件的写入流// the number of writers that are awaiting a drain event in .pipe()sthis.awaitDrain = 0;// if true, a maybeReadMore has been scheduledthis.readingMore = false;this.decoder = null;this.encoding = null;if (options.encoding) {if (!StringDecoder)StringDecoder = require('string_decoder').StringDecoder;this.decoder = new StringDecoder(options.encoding);this.encoding = options.encoding;}}复制代码


Readable.prototype.pipe = function (dest, pipeOpts) {var src = thisvar state = this._readableState...// 可读流实例监听data,可读流会从数据源获取数据,同时数据被传递到了消费者src.on('data', ondata)function ondata (chunk) {...var ret = dest.write(chunk)...}...}复制代码

Node提供的可读流有3种方式可以将初始态flowing = null的可读流转化为flowing = true


事实上这3种方式都回归到了一种方式上:strean.resume(),通过调用这个方法,将可读流的模式改变为flowing态。继续回到上面的例子当中,在调用了rs.pipe()方法后,实际上内部是调用了src.on('data', ondata)监听data事件,那么我们就来看下这个方法当中做了哪些工作。

Readable.prototype.on = function (ev, fn) {...// 监听data事件if (ev === 'data') {// 可读流一开始的flowing状态是null// Start flowing on next tick if stream isn't explicitly pausedif (this._readableState.flowing !== false)this.resume();} else if (ev === 'readable') {...}return res;}复制代码


Readable.prototype.resume = function() {var state = this._readableState;if (!state.flowing) {debug('resume');// 置为flowing状态state.flowing = true;resume(this, state);}return this;};function resume(stream, state) {if (!state.resumeScheduled) {state.resumeScheduled = true;process.nextTick(resume_, stream, state);}}function resume_(stream, state) {if (!state.reading) {debug('resume read 0');// 开始从数据源中获取数据stream.read(0);}state.resumeScheduled = false;// 如果是flowing状态的话,那么将awaitDrain置为0state.awaitDrain = 0;stream.emit('resume');flow(stream);if (state.flowing && !state.reading)stream.read(0);}复制代码


Readable.prototype.read = function (n) {...if (n === 0 &&state.needReadable &&(state.length >= state.highWaterMark || state.ended)) {debug('read: emitReadable', state.length, state.ended);// 如果缓存中没有数据且处于end状态if (state.length === 0 && state.ended)// 流状态结束endReadable(this);else// 触发readable事件emitReadable(this);return null;}...// 从缓存中可以读取的数据n = howMuchToRead(n, state);// 判断是否应该从数据源中获取数据// if we need a readable event, then we need to do some reading.var doRead = state.needReadable;debug('need readable', doRead);// if we currently have less than the highWaterMark, then also read some// 如果buffer的长度为0或者buffer的长度减去需要读取的数据的长度 < hwm 的时候,那么这个时候还需要继续读取数据// state.length - n 即表示当前buffer已有的数据长度减去需要读取的数据长度后,如果还小于hwm话,那么doRead仍然置为trueif (state.length === 0 || state.length - n < state.highWaterMark) {// 继续read数据doRead = true;debug('length less than watermark', doRead);}// however, if we've ended, then there's no point, and if we're already// reading, then it's unnecessary.// 如果数据已经读取完毕,或者处于正在读取的状态,那么doRead置为false表明不需要读取数据if (state.ended || state.reading) {doRead = false;debug('reading or ended', doRead);} else if (doRead) {debug('do read');state.reading = true;state.sync = true;// if the length is currently zero, then we *need* a readable event.// 如果当前缓冲区的长度为0,首先将needReadable置为true,那么再当缓冲区有数据的时候就触发readable事件if (state.length === 0)state.needReadable = true;// call internal read method// 从数据源获取数据,可能是同步也可能是异步的状态,这个取决于自定义_read方法的内部实现,可参见study里面的示例代码this._read(state.highWaterMark);state.sync = false;// If _read pushed data synchronously, then `reading` will be false,// and we need to re-evaluate how much data we can return to the user.// 如果_read方法是同步,那么reading字段将会为false。这个时候需要重新计算有多少数据需要重新返回给消费者if (!state.reading)n = howMuchToRead(nOrig, state);}// ret为输出给消费者的数据var ret;if (n > 0)ret = fromList(n, state);elseret = null;if (ret === null) {state.needReadable = true;n = 0;} else {state.length -= n;}if (state.length === 0) {// If we have nothing in the buffer, then we want to know// as soon as we *do* get something into the buffer.if (!state.ended)state.needReadable = true;// If we tried to read() past the EOF, then emit end on the next tick.if (nOrig !== n && state.ended)endReadable(this);}// 只要从数据源获取的数据不为null,即未EOF时,那么每次读取数据都会触发data事件if (ret !== null)this.emit('data', ret);return ret;}复制代码


const rs = new Readable({read () {if (c >= 'z'.charCodeAt(0)) return rs.push(null)setTimeout(() => {// 向可读流中推送数据rs.push(String.fromCharCode(++c))}, 100)}})复制代码


Readable.prototype.push = function (chunk, encoding) {....// 对从数据源拿到的数据做处理return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);}function readableAddChunk (stream, chunk, encoding, addToFront, skipChunkCheck) {... // 是否添加数据到头部if (addToFront) {// 如果不能在写入数据if (state.endEmitted)stream.emit('error',new errors.Error('ERR_STREAM_UNSHIFT_AFTER_END_EVENT'));elseaddChunk(stream, state, chunk, true);} else if (state.ended) { // 已经EOF,但是仍然还在推送数据,这个时候会报错stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF'));} else {// 完成一次读取后,立即将reading的状态置为falsestate.reading = false;if (state.decoder && !encoding) {chunk = state.decoder.write(chunk);if (state.objectMode || chunk.length !== 0)// 添加数据到尾部addChunk(stream, state, chunk, false);elsemaybeReadMore(stream, state);} else {// 添加数据到尾部addChunk(stream, state, chunk, false);}}...return needMoreData(state);}// 根据stream的状态来对数据做处理function addChunk(stream, state, chunk, addToFront) {// flowing为readable stream的状态,length为buffer的长度// flowing模式下且为异步读取数据的过程时,可读流的缓冲区并不保存数据,而是直接获取数据后触发data事件供消费者使用if (state.flowing && state.length === 0 && !state.sync) {// 对于flowing模式的Reabable,可读流自动从系统底层读取数据,直接触发data事件,且继续从数据源读取数据stream.read(0)stream.emit('data', chunk);// 继续从缓存池中获取数据stream.read(0);} else {// update the buffer info.// 数据的长度state.length += state.objectMode ? 1 : chunk.length;// 将数据添加到头部if (addToFront)state.buffer.unshift(chunk);else// 将数据添加到尾部state.buffer.push(chunk);// 触发readable事件,即通知缓存当中现在有数据可读if (state.needReadable)emitReadable(stream);}maybeReadMore(stream, state);}复制代码






function resume_() {...// flow(stream);if (state.flowing && !state.reading)stream.read(0); // 继续从数据源获取数据}function flow(stream) {...// 如果处理flowing状态,那么调用stream.read()方法用以从stream的缓冲区中获取数据并供消费者来使用while (state.flowing && stream.read() !== null);}复制代码






const { Readable } = require('stream')let c = 97 - 1const rs = new Readable({highWaterMark: 3,read () {if (c >= 'f'.charCodeAt(0)) return rs.push(null)setTimeout(() => {rs.push(String.fromCharCode(++c))}, 1000)}})rs.setEncoding('utf8')rs.on('readable', () => {// console.log(rs._readableState.length)console.log('get the data from readable: ', rs.read())})复制代码


Readable.prototype.on = function (env) {if (env === 'data') {...} else if (env === 'readable') {// 监听readable事件const state = this._readableState;if (!state.endEmitted && !state.readableListening) {state.readableListening = state.needReadable = true;state.emittedReadable = false;if (!state.reading) {process.nextTick(nReadingNextTick, this);} else if (state.length) {emitReadable(this);}}}}function nReadingNextTick(self) {debug('readable nexttick read 0');// 开始从数据源获取数据self.read(0);}复制代码


function addChunk(stream, state, chunk, addToFront) {if (state.flowing && state.length === 0 && !state.sync) {...} else {// update the buffer info.// 数据的长度state.length += state.objectMode ? 1 : chunk.length;// 将数据添加到头部if (addToFront)state.buffer.unshift(chunk);else// 将数据添加到尾部state.buffer.push(chunk);// 触发readable事件,即通知缓存当中现在有数据可读if (state.needReadable)emitReadable(stream);}maybeReadMore(stream, state);}复制代码


function maybeReadMore(stream, state) {if (!state.readingMore) {state.readingMore = true;process.nextTick(maybeReadMore_, stream, state);}}function maybeReadMore_(stream, state) {var len = state.length;// 在非flowing的模式下,且缓冲区的数据长度小于hwmwhile (!state.reading && !state.flowing && !state.ended &&state.length < state.highWaterMark) {debug('maybeReadMore read 0');stream.read(0);// 获取不到数据后if (len === state.length)// didn't get any data, stop spinning.break;elselen = state.length;}state.readingMore = false;}复制代码





Readable.prototype.pipe = function () {...// 监听drain事件var ondrain = pipeOnDrain(src);dest.on('drain', ondrain);...src.on('data', ondata)function ondata () {increasedAwaitDrain = false;// 向writable中写入数据var ret = dest.write(chunk);if (false === ret && !increasedAwaitDrain) {...src.pause();}}...}function pipeOnDrain(src) {return function() {var state = src._readableState;debug('pipeOnDrain', state.awaitDrain);// 减少pipes中awaitDrain的数量if (state.awaitDrain)state.awaitDrain--;// 如果awaitDrain的数量为0,且readable上绑定了data事件时(EE.listenerCount返回绑定的事件回调数量)if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {// 重新开启flowing模式state.flowing = true;flow(src);}};}复制代码



