如何在AWS上将日志从CloudWatch流传输到Elastic Search

您可以通过以下链接以PDF格式下载本文来支持我们。以PDF格式下载指南关闭关闭关闭

本文旨在向读者解释将日志从CloudWatch日志组流到AWS Elasticsearch集群的分步过程。读者需要创建lambda策略和角色。下面介绍如何创建ElasticSearch订阅过滤器。最后,我们向用户展示如何修改lambda函数,以允许将多个日志组流式传输到群集。

先决条件

在执行本文中的步骤之前,读者应执行以下操作:

  • AWS账户。
  • 我创建了一个有权使用AWS账户创建资源的用户。
  • 您可以使用您的AWS账户创建Elasticsearch集群,并通过VPC或Internet终端节点访问该集群。

创建Lambda执行角色

使用lambda函数将日志流式传输到Elasticsearch。 在AWS IAM控制台中,单击策略。下一个,[ポリシーの作成]选择。

建立政策

将会打开一个窗口。下一个,[JSON]选择一个标签。

如何在AWS上将日志从CloudWatch流传输到Elastic Search创建一个JSON策略

[JSON]在选项卡上,粘贴以下命令。同样重要 替换资源,使用Elasticsearch集群arn。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "es:*"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:es:eu-west-1:****************:domain/test-es-cluster/*"
        }
    ]
}

[ポリシーの確認]单击选项卡,然后输入策略的名称和说明。下一个,[ポリシーの作成]单击。完成后,仍在IAM控制台中,返回到该角色,[ロールの作成]单击。

如何在AWS上将日志从CloudWatch流传输到Elastic Search创建角色

选择一个服务角色。另外,选择Lambda作为用例。

如何在AWS上将日志从CloudWatch流传输到Elastic Search服务和用例选择

然后单击“权限”,单击“策略”,然后选择您之前创建的策略。

如何在AWS上将日志从CloudWatch流传输到Elastic Search将策略附加到角色

[タグ]单击以添加用于该角色的标签。下一个,[レビュー]单击并输入角色的名称和描述。下一个,[役割の作成]单击。现在您已经准备好担任Lambda的角色。

编辑lambda角色的信任关系

在AWS IAM控制台中,选择上面创建的lambda角色。下一个,[信頼関係]选择一个标签,然后[信頼関係の編集]单击。

如何在AWS上将日志从CloudWatch流传输到Elastic Search编辑lambda角色的信任关系

在打开的窗口中,删除所有策略文档,然后粘贴以下代码。下一个,[信頼ポリシーの更新]单击。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

现在您可以将日志流传输到Lambdalol中的Elasticsearch Kibana。

为日志组创建Elasticsearch订阅

在CloudWatch控制台中,选择一个日志组。 选择您要为其创建Elasticsearch订阅的日志组。在日志组窗口中,选择一个操作,然后从下拉菜单中[Elasticsearchサブスクリプションフィルターの作成]选择。

如何在AWS上将日志从CloudWatch流传输到Elastic Search创建一个ElasticSearch订阅过滤器

在打开的窗口中,选择为其创建ES群集的帐户。在这种情况下,它与CloudWatch日志组位于同一帐户中。 我选择“此帐户”,然后选择Amazon ES集群以流式传输日志。然后从Lambda函数下方的下拉列表中选择您先前创建的Lambda执行角色。

如何在AWS上将日志从CloudWatch流传输到Elastic Search选择Elasticsearch集群和Lambda执行角色

向下滚动,将提示您配置日志格式和过滤器。见下文。

如何在AWS上将日志从CloudWatch流传输到Elastic Search日志格式和过滤器配置

请选择所需的日志。您还可以测试并查看其外观。如果你满意[ストリーミングの開始]单击。您现在应该将日志视为Elasticsearch Kibana的索引。

修改Lambda函数以流式传输来自多个日志组的日志

要将日志从多个CloudWatch日志组流到Elasticsearch集群,您需要修改上面创建的原始Lambda函数的代码。 用以下代码替换Lambda函数代码。您只需更改var端点(代码段的第5行)即可。确保将其替换为Elasticsearch集群端点。完成后[保存]单击。

// v1.1.2
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');

var endpoint="search-test-es-cluster-**************************.eu-west-1.es.amazonaws.com";

// Set this to true if you want to debug why data isn't making it to
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;

exports.handler = function(input, context) {
    // decode input from base64
    var zippedInput = new Buffer.from(input.awslogs.data, 'base64');

    // decompress the input
    zlib.gunzip(zippedInput, function(error, buffer) {
        if (error) { context.fail(error); return; }

        // parse the input from JSON
        var awslogsData = JSON.parse(buffer.toString('utf8'));

        // transform the input to Elasticsearch documents
        var elasticsearchBulkData = transform(awslogsData);

        // skip control messages
        if (!elasticsearchBulkData) {
            console.log('Received a control message');
            context.succeed('Control message handled successfully');
            return;
        }

        // post documents to the Amazon Elasticsearch Service
        post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
            console.log('Response: ' + JSON.stringify({
                "statusCode": statusCode
            }));

            if (error) {
                logFailure(error, failedItems);
                context.fail(JSON.stringify(error));
            } else {
                console.log('Success: ' + JSON.stringify(success));
                context.succeed('Success');
            }
        });
    });
};

function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
        return null;
    }

    var bulkRequestBody = '';

    payload.logEvents.forEach(function(logEvent) {
        var timestamp = new Date(1 * logEvent.timestamp);

        // index name format: cwl-YYYY.MM.DD
        //var indexName = [
            //'cwl-' + timestamp.getUTCFullYear(),              // year
            //('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            //('0' + timestamp.getUTCDate()).slice(-2)          // day
        //].join('.');
        // index name format: cwl-YYYY.MM.DD
        //var appName =payload.logGroup.toLowerCase();
        //var indexName="";
        var indexName = [
            'cwl-' + payload.logGroup.toLowerCase().split('/').join('-') + '-' + timestamp.getUTCFullYear(), // log group + year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');
        var source = buildSource(logEvent.message, logEvent.extractedFields);
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;

        var action = { "index": {} };
        action.index._index = indexName;
        action.index._type = payload.logGroup;
        action.index._id = logEvent.id;

        bulkRequestBody += [
            JSON.stringify(action),
            JSON.stringify(source),
        ].join('n') + 'n';
    });
    return bulkRequestBody;
}

function buildSource(message, extractedFields) {
    if (extractedFields) {
        var source = {};

        for (var key in extractedFields) {
            if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
                var value = extractedFields[key];

                if (isNumeric(value)) {
                    source[key] = 1 * value;
                    continue;
                }

                jsonSubString = extractJson(value);
                if (jsonSubString !== null) {
                    source['$' + key] = JSON.parse(jsonSubString);
                }

                source[key] = value;
            }
        }
        return source;
    }

    jsonSubString = extractJson(message);
    if (jsonSubString !== null) {
        return JSON.parse(jsonSubString);
    }

    return {};
}

function extractJson(message) {
    var jsonStart = message.indexOf('{');
    if (jsonStart < 0) return null;
    var jsonSubString = message.substring(jsonStart);
    return isValidJson(jsonSubString) ? jsonSubString : null;
}

function isValidJson(message) {
    try {
        JSON.parse(message);
    } catch (e) { return false; }
    return true;
}

function isNumeric(n) {
    return !isNaN(parseFloat(n)) && isFinite(n);
}

function post(body, callback) {
    var requestParams = buildRequest(endpoint, body);

    var request = https.request(requestParams, function(response) {
        var responseBody = '';
        response.on('data', function(chunk) {
            responseBody += chunk;
        });

        response.on('end', function() {
            var info = JSON.parse(responseBody);
            var failedItems;
            var success;
            var error;

            if (response.statusCode >= 200 && response.statusCode < 299) {
                failedItems = info.items.filter(function(x) {
                    return x.index.status >= 300;
                });

                success = {
                    "attemptedItems": info.items.length,
                    "successfulItems": info.items.length - failedItems.length,
                    "failedItems": failedItems.length
                };
            }

            if (response.statusCode !== 200 || info.errors === true) {
                // prevents logging of failed entries, but allows logging
                // of other errors such as access restrictions
                delete info.items;
                error = {
                    statusCode: response.statusCode,
                    responseBody: info
                };
            }

            callback(error, success, response.statusCode, failedItems);
        });
    }).on('error', function(e) {
        callback(e);
    });
    request.end(requestParams.body);
}

function buildRequest(endpoint, body) {
    var endpointParts = endpoint.match(/^([^.]+).?([^.]*).?([^.]*).amazonaws.com$/);
    var region = endpointParts[2];
    var service = endpointParts[3];
    var datetime = (new Date()).toISOString().replace(/[:-]|.d{3}/g, '');
    var date = datetime.substr(0, 8);
    var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
    var kRegion = hmac(kDate, region);
    var kService = hmac(kRegion, service);
    var kSigning = hmac(kService, 'aws4_request');

    var request = {
        host: endpoint,
        method: 'POST',
        path: '/_bulk',
        body: body,
        headers: {
            'Content-Type': 'application/json',
            'Host': endpoint,
            'Content-Length': Buffer.byteLength(body),
            'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
            'X-Amz-Date': datetime
        }
    };

    var canonicalHeaders = Object.keys(request.headers)
        .sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
        .map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
        .join('n');

    var signedHeaders = Object.keys(request.headers)
        .map(function(k) { return k.toLowerCase(); })
        .sort()
        .join(';');

    var canonicalString = [
        request.method,
        request.path, '',
        canonicalHeaders, '',
        signedHeaders,
        hash(request.body, 'hex'),
    ].join('n');

    var credentialString = [ date, region, service, 'aws4_request' ].join('/');

    var stringToSign = [
        'AWS4-HMAC-SHA256',
        datetime,
        credentialString,
        hash(canonicalString, 'hex')
    ] .join('n');

    request.headers.Authorization = [
        'AWS4-HMAC-SHA256 Credential=" + process.env.AWS_ACCESS_KEY_ID + "/' + credentialString,
        'SignedHeaders=" + signedHeaders,
        "Signature=" + hmac(kSigning, stringToSign, "hex')
    ].join(', ');

    return request;
}

function hmac(key, str, encoding) {
    return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}

function hash(str, encoding) {
    return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}

function logFailure(error, failedItems) {
    if (logFailedResponses) {
        console.log('Error: ' + JSON.stringify(error, null, 2));

        if (failedItems && failedItems.length > 0) {
            console.log("Failed Items: " +
                JSON.stringify(failedItems, null, 2));
        }
    }
}

您现在可以将多个日志流传输到Elasticsearch集群Kibana。

相关文章:

  • 在AWS上使用Kibana设置Elasticsearch集群
  • 在不影响master分支的情况下在Git中实现新功能
  • 将AWS VPC流日志设置为CloudWatch Log组

AWS学习课程:


AWS认证解决方案架构师-助理2020

AWS认证解决方案架构师-助理2020

★★★★★(190636)$ 14.08 $ 152.48有库存

立即购买

如何在AWS上将日志从CloudWatch流传输到Elastic SearchUdemy.com


最终AWS认证解决方案架构师2020年助理

最终AWS认证解决方案架构师2020年助理

★★★★★(45225)$ 16.42 $ 175.94有库存

立即购买

如何在AWS上将日志从CloudWatch流传输到Elastic SearchUdemy.com


AWS认证开发人员协会2020

AWS认证开发人员协会2020

★★★★☆(37770)$ 14.08 $ 152.48有库存

立即购买

如何在AWS上将日志从CloudWatch流传输到Elastic SearchUdemy.com


最终AWS认证开发人员助理2020-新!

最终AWS认证开发人员助理2020-新!

★★★★★(26864)$ 19.94 $ 199.40有库存

立即购买

如何在AWS上将日志从CloudWatch流传输到Elastic SearchUdemy.com您可以从下面的链接以PDF格式下载本文,以为我们提供支持。以PDF格式下载指南关闭关闭关闭

Sidebar