栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Node.js stream模块(三)背压机制

Node.js stream模块(三)背压机制

我们知道 可读流是作为数据生产者,而可写流作为数据消费者。

那么二者必然是可以结合使用的。即可读流生产出来的数据给可写流消费。

我们这里使用文件可读流和文件可写流来模拟这种情况:

实现很简单,可读流对象通过data事件可以进入流动模式,还是源源不断地生产数据,而可写流对象通过write方法可以写入数据并消费。所以二者结合就实现了生产与消费地结合。

但是我们注意到:文件可读流的缓冲区水位线默认是64KB,而文件可写流的缓冲区水位线默认是16KB。为何官方要设计出这种差异呢?

其实这和磁盘文件的读取和写入速度有关,一般来说,从磁盘中读取文件的速度 要快于 向磁盘中写入文件。

举个例子:抄作业很快,写作业很慢。

而文件可读流其实就是从磁盘中读取文件数据,类似于抄作业,

文件可写流就是向磁盘中写入文件数据,类似于写作业,所以文件可读流速度快于文件可写流。

我再站在生产者,消费者角度思考:

即 生产者的生产速度,快于,消费者的消费速度。

而生产速度和消费速度的不匹配,就会产生一个严重的问题:供大于求。

在经济学上,这是要发生美国奶农倾倒牛奶的事件的。而这是人文社会所不愿意看到的,所以出现了政府干预市场,比如中国一直推行的减排政策,归根揭底还是减少不必要的生产活动。

而在流的世界里,也模拟出了这种减排政策,即背压机制。

即当消费者的消费速度赶不上生产者的生产速度时,就让生产者暂停生产,让消费者能缓一口气去消费。而等消费者消费完后,再通知生产者继续生产。

const fs = require('fs')
const path = require('path')


const rs = fs.createReadStream(path.join(__dirname, 'test.txt'), {
  highWaterMark: 4  // 可读流缓冲区水位线默认64KB
})

const ws = fs.createWriteStream(path.join(__dirname, 'test2.txt'), {
  highWaterMark: 1 // 可写流缓冲区水位线默认16KB
})

rs.on('data', (chunk) => {
  let flag = ws.write(chunk)
  if (!flag) {
    rs.pause()
  }
})

ws.on('drain', () => {
  rs.resume()
})

监听可读流的data事件,会触发可读流的流动模式,会源源不断地生产数据,生产出来的chunk,会被交给可写流write消费,但是如果write返回false,表示数据量已经超过了缓冲区水位线,如果继续生产下去,可能会造成写流缓冲区发生内存溢出,所以此时需要暂停生产,即rs.pause(),让可读流进入暂停模式。

而当可写流将缓冲区数据消费完了,就会触发drain事件,即表示生产者可以继续生产了,即rs.resume()。

这就是背压机制的基本逻辑。

以上对于背压机制的实现有点公式化,所以针对可读流内置了一个pipe方法,

其内在实现就是背压机制。

Readable.prototype.pipe = function(dest, pipeOpts) {
  const src = this;
  const state = this._readableState;

  if (state.pipes.length === 1) {
    if (!state.multiAwaitDrain) {
      state.multiAwaitDrain = true;
      state.awaitDrainWriters = new SafeSet(
        state.awaitDrainWriters ? [state.awaitDrainWriters] : []
      );
    }
  }

  state.pipes.push(dest);
  debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);

  const doEnd = (!pipeOpts || pipeOpts.end !== false) &&
              dest !== process.stdout &&
              dest !== process.stderr;

  const endFn = doEnd ? onend : unpipe;
  if (state.endEmitted)
    process.nextTick(endFn);
  else
    src.once('end', endFn);

  dest.on('unpipe', onunpipe);
  function onunpipe(readable, unpipeInfo) {
    debug('onunpipe');
    if (readable === src) {
      if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
        unpipeInfo.hasUnpiped = true;
        cleanup();
      }
    }
  }

  function onend() {
    debug('onend');
    dest.end();
  }

  let ondrain;

  let cleanedUp = false;
  function cleanup() {
    debug('cleanup');
    // Cleanup event handlers once the pipe is broken.
    dest.removeListener('close', onclose);
    dest.removeListener('finish', onfinish);
    if (ondrain) {
      dest.removeListener('drain', ondrain);
    }
    dest.removeListener('error', onerror);
    dest.removeListener('unpipe', onunpipe);
    src.removeListener('end', onend);
    src.removeListener('end', unpipe);
    src.removeListener('data', ondata);

    cleanedUp = true;

    // If the reader is waiting for a drain event from this
    // specific writer, then it would cause it to never start
    // flowing again.
    // So, if this is awaiting a drain, then we just call it now.
    // If we don't know, then assume that we are waiting for one.
    if (ondrain && state.awaitDrainWriters &&
        (!dest._writableState || dest._writableState.needDrain))
      ondrain();
  }

  function pause() {
    // If the user unpiped during `dest.write()`, it is possible
    // to get stuck in a permanently paused state if that write
    // also returned false.
    // => Check whether `dest` is still a piping destination.
    if (!cleanedUp) {
      if (state.pipes.length === 1 && state.pipes[0] === dest) {
        debug('false write response, pause', 0);
        state.awaitDrainWriters = dest;
        state.multiAwaitDrain = false;
      } else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
        debug('false write response, pause', state.awaitDrainWriters.size);
        state.awaitDrainWriters.add(dest);
      }
      src.pause();
    }
    if (!ondrain) {
      // When the dest drains, it reduces the awaitDrain counter
      // on the source.  This would be more elegant with a .once()
      // handler in flow(), but adding and removing repeatedly is
      // too slow.
      ondrain = pipeonDrain(src, dest);
      dest.on('drain', ondrain);
    }
  }

  src.on('data', ondata);
  function ondata(chunk) {
    debug('ondata');
    const ret = dest.write(chunk);
    debug('dest.write', ret);
    if (ret === false) {
      pause();
    }
  }

  // If the dest has an error, then stop piping into it.
  // However, don't suppress the throwing behavior for this.
  function onerror(er) {
    debug('onerror', er);
    unpipe();
    dest.removeListener('error', onerror);
    if (EE.listenerCount(dest, 'error') === 0) {
      const s = dest._writableState || dest._readableState;
      if (s && !s.errorEmitted) {
        // User incorrectly emitted 'error' directly on the stream.
        errorOrDestroy(dest, er);
      } else {
        dest.emit('error', er);
      }
    }
  }

  // Make sure our error handler is attached before userland ones.
  prependListener(dest, 'error', onerror);

  // Both close and finish should trigger unpipe, but only once.
  function onclose() {
    dest.removeListener('finish', onfinish);
    unpipe();
  }
  dest.once('close', onclose);
  function onfinish() {
    debug('onfinish');
    dest.removeListener('close', onclose);
    unpipe();
  }
  dest.once('finish', onfinish);

  function unpipe() {
    debug('unpipe');
    src.unpipe(dest);
  }

  // Tell the dest that it's being piped to.
  dest.emit('pipe', src);

  // Start the flow if it hasn't been started already.

  if (dest.writableNeedDrain === true) {
    if (state.flowing) {
      pause();
    }
  } else if (!state.flowing) {
    debug('pipe resume');
    src.resume();
  }

  return dest;
};

pipe方法可以实现协调数据生成和消费。我们通常将pipe形象的成为管道。

pipe方法的调用者一般是可读流对象,而pipe方法参数一般是可写流对象,当pipe方法调用时,就会触发可写流对象的pipe事件。

pipe事件的回调函数会收到调用pipe事件的可读流对象

pipe方法的返回值是pipe方法参数对应的可写流对象。

pipe的本质作用还是将上游数据传递到下游,pipe就是一个传输管道。

我们也可以通过unpipe方法来切断传输管道,即切断传输。这样就能终端pipe传输数据。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/713934.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号