您可以使用异步库解决此问题。您可以对任何流使用以下模式。
var AsyncLib = require('async');var worker = function (payload, cb) { //do something with payload and call callback return cb();};var concurrency = 5;var streamQueue = AsyncLib.queue(worker, concurrency);var stream = //some readable stream;stream.on('data', function(data) { //no need to pause and resume var payload = '//some payload'; streamQueue.push(payload);}).on('end', function() { //register drain event on end and callback streamQueue.drain = function () { callback(); };});


