目录
数据消费者
可写流Writable
_write方法
write方法
end方法
write方法执行流程
两个问题的思考
drain事件
数据消费者
前面介绍了stream模块的可读流stream.Readable,通过可读流的工作模式,我们可以知道可读流其实是数据生产者,而且必须要有消费者通过监听data事件或者监听readable事件调用read方法去消费数据,才能触发可读流进行数据生产。即消费拉动生产。
我们需要搞清楚上面程序中,谁是消费者。
监听data事件?data事件回调函数?console.log?
其实纯粹的消费者定义,应该是 单纯消费数据的人。消费者不应该操心数据的生产工作,而是只关注数据的消费。所以上面console.log算是一个消费者。
那么监听data事件算是什么呢?
监听data事件能够取出可读流对象缓冲区中的数据给自身回调函数作为参数,如果没有消费者消费,则数据被丢弃。
监听data事件的行为像是一个配送员:
消费者网上下单到某商店购买商品,被一个配送员接单,然后配送员到达商店,催出商店给货,商店库存发现商品不够就回去工厂调货,调到货后,加入商店库存,或者直接交给配送员,配送员开开心心的送货,结果发现是个假单,没有消费者,而配送员无法消费商品,所以直接丢了。
console.log 才是真正的数据消费者,我们知道console.log可以将入参转为字符串后在控制台打印。而控制台打印算是标准输出流。所以其实真正消费者是process.stdout。而process.stdout就是一个可写流对象。那么可写流对象如何进行数据消费呢?我们又如何创建一个自定义的可写流对象呢?
可写流Writable
在stream模块下,有一个Writable抽象类,它有一个抽象方法_write,我们一般用自定义类继承stream.Writable,并重写_write方法。
_write方法
在可写流对象中,有一个缓冲区用于存储将被消费的数据,而_write的作用就是接收缓冲区中的数据并消费
write方法
那么可写流对象的缓冲区的中数据来自于哪呢?Writable原型上有一个write方法,我们通过可写流对象调用write方法可以向缓冲区中写入数据。
write方法可以执行多次,即表示可以向可写流缓冲区中写入多次
write方法每次写入都一个返回值true或者false,
true表示写入本次数据后,缓冲区数据还没有达到highWaterMark水位线,不存在内存溢出风险,
false表示写入本次数据后,缓冲区数据量已经超过了highWaterMark水位线,继续写入可能会造成内存溢出风险了,但是并不会阻止write继续写入数据。
另外write方法的参数只能是字符串,Buffer对象,字节数组
end方法
那么可写流对象如何知道不会再有数据写入了呢?即如何标志写入结束呢?
Writable原型上有一个end方法,当可写流对象调用end方法后,标志可写流对象停止写入数据,之后再调用write方法就会报错。
end方法参数和write方法一致。但是end方法返回值是当前可写流对象。
write方法执行流程
Writable.prototype.write = function(chunk, encoding, cb) {
return _write(this, chunk, encoding, cb) === true;
};
function _write(stream, chunk, encoding, cb) {
const state = stream._writableState;
if (typeof encoding === 'function') {
cb = encoding;
encoding = state.defaultEncoding;
} else {
if (!encoding)
encoding = state.defaultEncoding;
else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding))
throw new ERR_UNKNOWN_ENCODING(encoding);
if (typeof cb !== 'function')
cb = nop;
}
if (chunk === null) {
throw new ERR_STREAM_NULL_VALUES();
} else if (!state.objectMode) {
if (typeof chunk === 'string') {
if (state.decodeStrings !== false) {
chunk = Buffer.from(chunk, encoding);
encoding = 'buffer';
}
} else if (chunk instanceof Buffer) {
encoding = 'buffer';
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
encoding = 'buffer';
} else {
throw new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
}
}
let err;
if (state.ending) {
err = new ERR_STREAM_WRITE_AFTER_END();
} else if (state.destroyed) {
err = new ERR_STREAM_DESTROYED('write');
}
if (err) {
process.nextTick(cb, err);
errorOrDestroy(stream, err, true);
return err;
}
state.pendingcb++;
return writeOrBuffer(stream, state, chunk, encoding, cb);
}
// If we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, callback) {
const len = state.objectMode ? 1 : chunk.length;
state.length += len;
// stream._write resets state.length
const ret = state.length < state.highWaterMark;
// We must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;
if (state.writing || state.corked || state.errored || !state.constructed) {
state.buffered.push({ chunk, encoding, callback });
if (state.allBuffers && encoding !== 'buffer') {
state.allBuffers = false;
}
if (state.allNoop && callback !== nop) {
state.allNoop = false;
}
} else {
state.writelen = len;
state.writecb = callback;
state.writing = true;
state.sync = true;
stream._write(chunk, encoding, state.onwrite);
state.sync = false;
}
// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed;
}
走读以上代码,我们可以分析出:write方法写入的数据{chunk, encoding, callback}可能会被缓存在state.buffered中,即可写流对象缓冲区中,也可能直接被传递给可写流对象重写的_write方法直接消费。即上面WriteOrBuffer方法的逻辑。
WriteOrBuffer方法中如果满足下面条件
if (state.writing || state.corked || state.errored || !state.constructed)
则,write方法写入的数据会被缓存在state.buffered中,否则,直接传给_write消费。
| 可读流对象_writableState下的属性 | 初值(可读流对象刚创建时) |
| writing | false |
| corked | 0 |
| errored | null |
| constructed | true |
所以默认情况下,上面判断条件为false,即write写入的数据不会被缓存到state.buffered中,而是会被_write消费。
而在write方法执行过程中,只有state.writing存在被置为true的机会,就是_write方法执行时
但是当_write回调执行时,即state.onwrite执行时,state.writing会被修改为false。
那必然存在一种情况:如果_write的回调state.onwrite是异步执行的,则state.writing的true就不会在同步代码执行时被置为false。比如下面,我们将在重写的_write中,将callback(对应state.onwrite)加入异步任务队列中
则_write执行后,state.writing还是为true,所以下一次write可以进入缓存流程
而这也是保证多次write的话,write的内容按顺序消费的逻辑。后面的write的数据{chunk, encoding, callback}必须要等前面的write的数据被消费掉才能被消费。
但是state.buffered缓冲区是有大小限制的,同时为了防止内存溢出,state.buffered存在水位线highWaterMark用来实时比较,当state.buffered中的数据量state.length超过了highWaterMark,write方法就会返回false,来通知调用write的人。
此时,虽然可以继续用write向state.buffered中继续添加数据,但是很不保险,随时有内存溢出的风险。所以推荐的做法是,当write返回false时,暂停后续write,先将state.buffered中的数据消费完。
两个问题的思考
此时有两个问题:
1、state.buffered缓冲区中的数据如何消费?
2、state.buffered缓冲区中数据消费完了,如何通知外界(生产者)进行后续write?
我们知道write流程中,只有_write可以消费数据,且第一次write必然走非缓存,即会触发_write,第二次write可能会走缓存。那么第二次write缓存的数据如何消费呢?
第一次write必走非缓存的_write,而此时传递给_write的回调并不是我们传入的callback,而是state.onwrite。下面是onwrite的代码。
function onwrite(stream, er) {
const state = stream._writableState;
const sync = state.sync;
const cb = state.writecb;
if (typeof cb !== 'function') {
errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
return;
}
state.writing = false;
state.writecb = null;
state.length -= state.writelen;
state.writelen = 0;
if (er) {
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
er.stack; // eslint-disable-line no-unused-expressions
if (!state.errored) {
state.errored = er;
}
// In case of duplex streams we need to notify the readable side of the
// error.
if (stream._readableState && !stream._readableState.errored) {
stream._readableState.errored = er;
}
if (sync) {
process.nextTick(onwriteError, stream, state, er, cb);
} else {
onwriteError(stream, state, er, cb);
}
} else {
if (state.buffered.length > state.bufferedIndex) {
clearBuffer(stream, state);
}
if (sync) {
// It is a common case that the callback passed to .write() is always
// the same. In that case, we do not schedule a new nextTick(), but
// rather just increase a counter, to improve performance and avoid
// memory allocations.
if (state.afterWriteTickInfo !== null &&
state.afterWriteTickInfo.cb === cb) {
state.afterWriteTickInfo.count++;
} else {
state.afterWriteTickInfo = { count: 1, cb, stream, state };
process.nextTick(afterWriteTick, state.afterWriteTickInfo);
}
} else {
afterWrite(stream, state, 1, cb);
}
}
}
state.onwrite如果被传入_write后同步执行,则state.writing会同步变为false,则第二次write会继续走非缓存消费。
state.onwrite如果被传入_write后异步执行,则state.writing在同步执行过程中继续保持true,则第二次write会走缓存。
所以第二次write数据进state.buffered缓冲区的话,则说明state.onwrite是异步执行的。
当同步代码执行完后,state.onwrite会被从异步任务队列中被事件循环捞出来执行。
通过debug可以发现onwrite之后进入了clearBuffer方法,通过名字就可以看出clearBuffer适用于清空state.buffered的。而clearBuffer会调用doWrite,doWrite会调用_write进行数据消费。
所以state.buffered缓冲区中的数据虽然是同步加入的,但是都是异步被消费的。验证如下:
drain事件
而当state.buffered中数据被清空完,即state.onwrite执行完前,都会执行afterWrite方法
function afterWrite(stream, state, count, cb) {
const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
state.needDrain;
if (needDrain) {
state.needDrain = false;
stream.emit('drain');
}
while (count-- > 0) {
state.pendingcb--;
cb();
}
if (state.destroyed) {
errorBuffer(state);
}
finishMaybe(stream, state);
}
在afterWrite方法会判断state.needDrain,若为true(且可写流非结束态)则触发drain事件。
需要注意的是,state.needDrain是在缓冲区已经清空后,作为是否触发drain事件的前提判断,并不是state.needDrain触发的缓冲区清空行为。
state.needDrain是当 缓冲区数据量超过水位线时被置为true的。
所以整体流程应该是:
第一次write写入的数据{chunk, encoding, callback},被同步消费,即同步调用_write,但是传入_write的参数是{chunk, encoding, state.onwrite},即回调参数已经被改为了state.onwrite,该回调方法如果在_write中被异步执行,则会导致第二次write写入的数据会被加入state.buffered,即可写流的缓冲区中,等待异步消费。
当同步代码执行完后,异步执行第一次_write的state.onwrite,该方法会触发清空state.buffered数据去给_write消费的操作clearBuffer,而当state.onwrite执行完前,会调用afterWrite,判断是否需要触发drain事件,当state.needDrain为true时,且可写流非结束态时,就会触发drain事件。
而drain事件正是可写流通知外界可以继续write的方式



