栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

使用Mongoose模式导入CSV

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用Mongoose模式导入CSV

您可以通过

headers
从模式定义获取来使用fast-
csv进行操作,该模式定义会将已解析的行作为“对象”返回。您实际上有一些不匹配,因此我已将它们标记为更正:

const fs = require('mz/fs');const csv = require('fast-csv');const { Schema } = mongoose = require('mongoose');const uri = 'mongodb://localhost/test';mongoose.Promise = global.Promise;mongoose.set('debug', true);const rankSchema = new Schema({  serverid: Number,  resetid: Number,  rank: Number,  name: String,  land: String,         // <-- You have this as Number but it's a string  networth: Number,  tag: String,  stuff: String,        // the empty field in the csv  gov: String,  gdi: Number,  protection: Number,  vacation: Number,  alive: Number,  deleted: Number});const Rank = mongoose.model('Rank', rankSchema);const log = data => console.log(JSON.stringify(data, undefined, 2));(async function() {  try {    const conn = await mongoose.connect(uri);    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));    let headers = Object.keys(Rank.schema.paths)      .filter(k => ['_id','__v'].indexOf(k) === -1);    console.log(headers);    await new Promise((resolve,reject) => {      let buffer = [],          counter = 0;      let stream = fs.createReadStream('input.csv')        .pipe(csv({ headers }))        .on("error", reject)        .on("data", async doc => {          stream.pause();          buffer.push(doc);          counter++;          log(doc);          try { if ( counter > 10000 ) {   await Rank.insertMany(buffer);   buffer = [];   counter = 0; }          } catch(e) { stream.destroy(e);          }          stream.resume();        })        .on("end", async () => {          try { if ( counter > 0 ) {   await Rank.insertMany(buffer);   buffer = [];   counter = 0;   resolve(); }          } catch(e) { stream.destroy(e);          }        });    });  } catch(e) {    console.error(e)  } finally {    process.exit()  }})()

只要该模式实际上与提供的CSV对齐,就可以了。这些是我可以看到的更正,但是如果您需要对实际字段名称进行不同的对齐,则需要进行调整。但是基本上

Number
在一个位置上有一个
String
,实际上是一个额外的字段,我认为这是CSV中的空白字段。

通常的事情是从架构中获取字段名称数组,并在制作csv解析器实例时将其传递给选项:

let headers = Object.keys(Rank.schema.paths)  .filter(k => ['_id','__v'].indexOf(k) === -1);let stream = fs.createReadStream('input.csv')  .pipe(csv({ headers }))

实际执行此操作后,您将获得“对象”而不是数组:

{  "serverid": "9",  "resetid": "1557",  "rank": "358",  "name": "286",  "land": "Mutantville",  "networth": "4368",  "tag": "2358026",  "stuff": "",  "gov": "M",  "gdi": "0",  "protection": "0",  "vacation": "0",  "alive": "1",  "deleted": "0"}

不要担心“类型”,因为Mongoose会根据模式强制转换值。

其余发生在

data
事件的处理程序中。为了获得最大的效率,我们
insertMany()
仅每10,000行写入一次数据库。它实际如何到达服务器和进程取决于MongoDB版本,但是根据内存使用情况的“权衡”并编写一个单据,您将为单个集合导入的平均字段数应该是10,000,这是相当合理的。合理的网络请求。如有必要,请减小数字。

重要的部分是将这些调用标记为

async
函数,然后
await
将其标记为结果
insertMany()
。另外,我们还需要进行
pause()
流处理,
resume()
否则需要对每个项目进行操作,否则就有可能
buffer
在实际发送文档之前覆盖要插入的文档。在
pause()
resume()
有必要把“反压”在管道上,否则项目只保留“出山”和烧制的
data
事件。

自然地,对于10,000个条目的控件要求我们在每次迭代和流完成时都进行检查,以清空缓冲区并将所有剩余文档发送到服务器。

那确实是您要执行的操作,因为您当然不希望在整个

data
事件的“每次”迭代中或者基本上不等待每个请求完成就向服务器触发异步请求。您无需检查“非常小的文件”就可以摆脱困境,但是对于任何实际负载,由于“正在进行中”的异步调用尚未完成,因此您肯定会超出调用堆栈。


仅供参考-

package.json
用过的。的
mz
,因为它只是一个现代化的可选
Promise
标准节点的启用库“内置”库,我只是习惯于使用。该代码当然可以与该
fs
模块完全互换。

{  "description": "",  "main": "index.js",  "dependencies": {    "fast-csv": "^2.4.1",    "mongoose": "^5.1.1",    "mz": "^2.7.0"  },  "keywords": [],  "author": "",  "license": "ISC"}

实际上,使用Node v8.9.x及更高版本,我们甚至可以

AsyncIterator
通过
stream-to-iterator
模块的实现来简化此过程。它仍然处于
Iterator<Promise<T>>
模式下,但是应该在Node
v10.x变为稳定LTS之前执行:

const fs = require('mz/fs');const csv = require('fast-csv');const streamToIterator = require('stream-to-iterator');const { Schema } = mongoose = require('mongoose');const uri = 'mongodb://localhost/test';mongoose.Promise = global.Promise;mongoose.set('debug', true);const rankSchema = new Schema({  serverid: Number,  resetid: Number,  rank: Number,  name: String,  land: String,  networth: Number,  tag: String,  stuff: String,        // the empty field  gov: String,  gdi: Number,  protection: Number,  vacation: Number,  alive: Number,  deleted: Number});const Rank = mongoose.model('Rank', rankSchema);const log = data => console.log(JSON.stringify(data, undefined, 2));(async function() {  try {    const conn = await mongoose.connect(uri);    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));    let headers = Object.keys(Rank.schema.paths)      .filter(k => ['_id','__v'].indexOf(k) === -1);    //console.log(headers);    let stream = fs.createReadStream('input.csv')      .pipe(csv({ headers }));    const iterator = await streamToIterator(stream).init();    let buffer = [],        counter = 0;    for ( let docPromise of iterator ) {      let doc = await docPromise;      buffer.push(doc);      counter++;      if ( counter > 10000 ) {        await Rank.insertMany(buffer);        buffer = [];        counter = 0;      }    }    if ( counter > 0 ) {      await Rank.insertMany(buffer);      buffer = [];      counter = 0;    }  } catch(e) {    console.error(e)  } finally {    process.exit()  }})()

基本上,所有流“事件”的处理,暂停和恢复都将替换为一个简单的

for
循环:

const iterator = await streamToIterator(stream).init();for ( let docPromise of iterator ) {  let doc = await docPromise;  // ... The things in the loop}

简单!

for..await..of
当它变得更稳定时,将在以后的节点实现中对此进行清理。但以上在指定版本及更高版本上运行良好。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/436632.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号