我们知道 可读流是作为数据生产者,而可写流作为数据消费者。
那么二者必然是可以结合使用的。即可读流生产出来的数据给可写流消费。
我们这里使用文件可读流和文件可写流来模拟这种情况:
实现很简单,可读流对象通过data事件可以进入流动模式,还是源源不断地生产数据,而可写流对象通过write方法可以写入数据并消费。所以二者结合就实现了生产与消费地结合。
但是我们注意到:文件可读流的缓冲区水位线默认是64KB,而文件可写流的缓冲区水位线默认是16KB。为何官方要设计出这种差异呢?
其实这和磁盘文件的读取和写入速度有关,一般来说,从磁盘中读取文件的速度 要快于 向磁盘中写入文件。
举个例子:抄作业很快,写作业很慢。
而文件可读流其实就是从磁盘中读取文件数据,类似于抄作业,
文件可写流就是向磁盘中写入文件数据,类似于写作业,所以文件可读流速度快于文件可写流。
我再站在生产者,消费者角度思考:
即 生产者的生产速度,快于,消费者的消费速度。
而生产速度和消费速度的不匹配,就会产生一个严重的问题:供大于求。
在经济学上,这是要发生美国奶农倾倒牛奶的事件的。而这是人文社会所不愿意看到的,所以出现了政府干预市场,比如中国一直推行的减排政策,归根揭底还是减少不必要的生产活动。
而在流的世界里,也模拟出了这种减排政策,即背压机制。
即当消费者的消费速度赶不上生产者的生产速度时,就让生产者暂停生产,让消费者能缓一口气去消费。而等消费者消费完后,再通知生产者继续生产。
const fs = require('fs')
const path = require('path')
const rs = fs.createReadStream(path.join(__dirname, 'test.txt'), {
highWaterMark: 4 // 可读流缓冲区水位线默认64KB
})
const ws = fs.createWriteStream(path.join(__dirname, 'test2.txt'), {
highWaterMark: 1 // 可写流缓冲区水位线默认16KB
})
rs.on('data', (chunk) => {
let flag = ws.write(chunk)
if (!flag) {
rs.pause()
}
})
ws.on('drain', () => {
rs.resume()
})
监听可读流的data事件,会触发可读流的流动模式,会源源不断地生产数据,生产出来的chunk,会被交给可写流write消费,但是如果write返回false,表示数据量已经超过了缓冲区水位线,如果继续生产下去,可能会造成写流缓冲区发生内存溢出,所以此时需要暂停生产,即rs.pause(),让可读流进入暂停模式。
而当可写流将缓冲区数据消费完了,就会触发drain事件,即表示生产者可以继续生产了,即rs.resume()。
这就是背压机制的基本逻辑。
以上对于背压机制的实现有点公式化,所以针对可读流内置了一个pipe方法,
其内在实现就是背压机制。
Readable.prototype.pipe = function(dest, pipeOpts) {
const src = this;
const state = this._readableState;
if (state.pipes.length === 1) {
if (!state.multiAwaitDrain) {
state.multiAwaitDrain = true;
state.awaitDrainWriters = new SafeSet(
state.awaitDrainWriters ? [state.awaitDrainWriters] : []
);
}
}
state.pipes.push(dest);
debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);
const doEnd = (!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
dest !== process.stderr;
const endFn = doEnd ? onend : unpipe;
if (state.endEmitted)
process.nextTick(endFn);
else
src.once('end', endFn);
dest.on('unpipe', onunpipe);
function onunpipe(readable, unpipeInfo) {
debug('onunpipe');
if (readable === src) {
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
cleanup();
}
}
}
function onend() {
debug('onend');
dest.end();
}
let ondrain;
let cleanedUp = false;
function cleanup() {
debug('cleanup');
// Cleanup event handlers once the pipe is broken.
dest.removeListener('close', onclose);
dest.removeListener('finish', onfinish);
if (ondrain) {
dest.removeListener('drain', ondrain);
}
dest.removeListener('error', onerror);
dest.removeListener('unpipe', onunpipe);
src.removeListener('end', onend);
src.removeListener('end', unpipe);
src.removeListener('data', ondata);
cleanedUp = true;
// If the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
if (ondrain && state.awaitDrainWriters &&
(!dest._writableState || dest._writableState.needDrain))
ondrain();
}
function pause() {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}
src.pause();
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
ondrain = pipeonDrain(src, dest);
dest.on('drain', ondrain);
}
}
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
const ret = dest.write(chunk);
debug('dest.write', ret);
if (ret === false) {
pause();
}
}
// If the dest has an error, then stop piping into it.
// However, don't suppress the throwing behavior for this.
function onerror(er) {
debug('onerror', er);
unpipe();
dest.removeListener('error', onerror);
if (EE.listenerCount(dest, 'error') === 0) {
const s = dest._writableState || dest._readableState;
if (s && !s.errorEmitted) {
// User incorrectly emitted 'error' directly on the stream.
errorOrDestroy(dest, er);
} else {
dest.emit('error', er);
}
}
}
// Make sure our error handler is attached before userland ones.
prependListener(dest, 'error', onerror);
// Both close and finish should trigger unpipe, but only once.
function onclose() {
dest.removeListener('finish', onfinish);
unpipe();
}
dest.once('close', onclose);
function onfinish() {
debug('onfinish');
dest.removeListener('close', onclose);
unpipe();
}
dest.once('finish', onfinish);
function unpipe() {
debug('unpipe');
src.unpipe(dest);
}
// Tell the dest that it's being piped to.
dest.emit('pipe', src);
// Start the flow if it hasn't been started already.
if (dest.writableNeedDrain === true) {
if (state.flowing) {
pause();
}
} else if (!state.flowing) {
debug('pipe resume');
src.resume();
}
return dest;
};
pipe方法可以实现协调数据生成和消费。我们通常将pipe形象的成为管道。
pipe方法的调用者一般是可读流对象,而pipe方法参数一般是可写流对象,当pipe方法调用时,就会触发可写流对象的pipe事件。
pipe事件的回调函数会收到调用pipe事件的可读流对象
pipe方法的返回值是pipe方法参数对应的可写流对象。
pipe的本质作用还是将上游数据传递到下游,pipe就是一个传输管道。
我们也可以通过unpipe方法来切断传输管道,即切断传输。这样就能终端pipe传输数据。



