栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

开始流式传输到Amazon Elasticsearch Service时如何选择不同的Lambda函数 elasticsearch

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

开始流式传输到Amazon Elasticsearch Service时如何选择不同的Lambda函数 elasticsearch

我使用AWS控制台选项(开始流式传输
到Amazon Elasticsearch Service)流式传输到ELK ,但是我无法更改或选择其他
lambda函数,因为
使用此选项只能为任何日志组选择lambda函数。

因此,我创建了新的lambda函数并将流目标设置为AWS lambda函数,

这是您所需的全部代码,lambda函数的Node版本为4. *,因为
新版本存在一些问题,但最重要的是它
不需要任何额外的NPM软件包。
require any extra NPM packages.

// v1.1.2var https = require('https');var zlib = require('zlib');var crypto = require('crypto');var endpoint = 'search-my-test.us-west-2.es.amazonaws.com';exports.handler = function(input, context) {    // depre input from base64    var zippedInput = new Buffer(input.awslogs.data, 'base64');    // decompress the input    zlib.gunzip(zippedInput, function(error, buffer) {        if (error) { context.fail(error); return; }        // parse the input from JSON        var awslogsData = JSON.parse(buffer.toString('utf8'));        // transform the input to Elasticsearch documents        var elasticsearchBulkData = transform(awslogsData);        // skip control messages        if (!elasticsearchBulkData) { console.log('Received a control message'); context.succeed('Control message handled successfully'); return;        }        // post documents to the Amazon Elasticsearch Service        post(elasticsearchBulkData, function(error, success, statusCode, failedItems) { console.log('Response: ' + JSON.stringify({      "statusCode": statusCode  })); if (error) {      console.log('Error: ' + JSON.stringify(error, null, 2));     if (failedItems && failedItems.length > 0) {         console.log("Failed Items: " +  JSON.stringify(failedItems, null, 2));     }     context.fail(JSON.stringify(error)); } else {     console.log('Success: ' + JSON.stringify(success));     context.succeed('Success'); }        });    });};function transform(payload) {    if (payload.messageType === 'CONTROL_MESSAGE') {        return null;    }    var bulkRequestBody = '';    payload.logEvents.forEach(function(logEvent) {        var timestamp = new Date(1 * logEvent.timestamp);        // index name format: cwl-YYYY.MM.DD        var indexName = [ 'prod-background-wo-' + timestamp.getUTCFullYear(),   // year ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month ('0' + timestamp.getUTCDate()).slice(-2)          // day        ].join('.');        var source = buildSource(logEvent.message, logEvent.extractedFields);        source['response_time'] = source["end"] - source["start"];        source['@id'] = logEvent.id;        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();        source['@message'] = logEvent.message;        source['@owner'] = payload.owner;        source['@log_group'] = payload.logGroup;        source['@log_stream'] = payload.logStream;        var action = { "index": {} };        action.index._index = indexName;        action.index._type = payload.logGroup;        action.index._id = logEvent.id;        bulkRequestBody += [  JSON.stringify(action),  JSON.stringify(source),        ].join('n') + 'n';    });    return bulkRequestBody;}function buildSource(message, extractedFields) {    if (extractedFields) {        var source = {};        for (var key in extractedFields) { if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {     var value = extractedFields[key];     if (isNumeric(value)) {         source[key] = 1 * value;         continue;     }     jsonSubString = extractJson(value);     if (jsonSubString !== null) {         source['$' + key] = JSON.parse(jsonSubString);     }     source[key] = value; }        }        return source;    }    jsonSubString = extractJson(message);    if (jsonSubString !== null) {         return JSON.parse(jsonSubString);     }    return {};}function extractJson(message) {    var jsonStart = message.indexOf('{');    if (jsonStart < 0) return null;    var jsonSubString = message.substring(jsonStart);    return isValidJson(jsonSubString) ? jsonSubString : null;}function isValidJson(message) {    try {        JSON.parse(message);    } catch (e) { return false; }    return true;}function isNumeric(n) {    return !isNaN(parseFloat(n)) && isFinite(n);}function post(body, callback) {    var requestParams = buildRequest(endpoint, body);    var request = https.request(requestParams, function(response) {        var responseBody = '';        response.on('data', function(chunk) { responseBody += chunk;        });        response.on('end', function() { var info = JSON.parse(responseBody); var failedItems; var success; if (response.statusCode >= 200 && response.statusCode < 299) {     failedItems = info.items.filter(function(x) {         return x.index.status >= 300;     });     success = {          "attemptedItems": info.items.length,         "successfulItems": info.items.length - failedItems.length,         "failedItems": failedItems.length     }; } var error = response.statusCode !== 200 || info.errors === true ? {     "statusCode": response.statusCode,     "responseBody": responseBody } : null; callback(error, success, response.statusCode, failedItems);        });    }).on('error', function(e) {        callback(e);    });    request.end(requestParams.body);}function buildRequest(endpoint, body) {    var endpointParts = endpoint.match(/^([^.]+).?([^.]*).?([^.]*).amazonaws.com$/);    var region = endpointParts[2];    var service = endpointParts[3];    var datetime = (new Date()).toISOString().replace(/[:-]|.d{3}/g, '');    var date = datetime.substr(0, 8);    var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);    var kRegion = hmac(kDate, region);    var kService = hmac(kRegion, service);    var kSigning = hmac(kService, 'aws4_request');    var request = {        host: endpoint,        method: 'POST',        path: '/_bulk',        body: body,        headers: {  'Content-Type': 'application/json', 'Host': endpoint, 'Content-Length': Buffer.byteLength(body), 'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN, 'X-Amz-Date': datetime        }    };    var canonicalHeaders = Object.keys(request.headers)        .sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })        .map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })        .join('n');    var signedHeaders = Object.keys(request.headers)        .map(function(k) { return k.toLowerCase(); })        .sort()        .join(';');    var canonicalString = [        request.method,        request.path, '',        canonicalHeaders, '',        signedHeaders,        hash(request.body, 'hex'),    ].join('n');    var credentialString = [ date, region, service, 'aws4_request' ].join('/');    var stringToSign = [        'AWS4-HMAC-SHA256',        datetime,        credentialString,        hash(canonicalString, 'hex')    ] .join('n');    request.headers.Authorization = [        'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,        'SignedHeaders=' + signedHeaders,        'Signature=' + hmac(kSigning, stringToSign, 'hex')    ].join(', ');    return request;}function hmac(key, str, encoding) {    return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);}function hash(str, encoding) {    return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/409509.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号