如何在AWS上将日志从CloudWatch流传输到Elastic Search
您可以通过以下链接以PDF格式下载本文来支持我们。以PDF格式下载指南关闭关闭关闭
本文旨在向读者解释将日志从CloudWatch日志组流到AWS Elasticsearch集群的分步过程。读者需要创建lambda策略和角色。下面介绍如何创建ElasticSearch订阅过滤器。最后,我们向用户展示如何修改lambda函数,以允许将多个日志组流式传输到群集。
先决条件
在执行本文中的步骤之前,读者应执行以下操作:
- AWS账户。
- 我创建了一个有权使用AWS账户创建资源的用户。
- 您可以使用您的AWS账户创建Elasticsearch集群,并通过VPC或Internet终端节点访问该集群。
创建Lambda执行角色
使用lambda函数将日志流式传输到Elasticsearch。 在AWS IAM控制台中,单击策略。下一个,[ポリシーの作成]选择。
建立政策
将会打开一个窗口。下一个,[JSON]选择一个标签。
创建一个JSON策略
[JSON]在选项卡上,粘贴以下命令。同样重要 替换资源,使用Elasticsearch集群arn。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"es:*"
],
"Effect": "Allow",
"Resource": "arn:aws:es:eu-west-1:****************:domain/test-es-cluster/*"
}
]
}
[ポリシーの確認]单击选项卡,然后输入策略的名称和说明。下一个,[ポリシーの作成]单击。完成后,仍在IAM控制台中,返回到该角色,[ロールの作成]单击。
创建角色
选择一个服务角色。另外,选择Lambda作为用例。
服务和用例选择
然后单击“权限”,单击“策略”,然后选择您之前创建的策略。
将策略附加到角色
[タグ]单击以添加用于该角色的标签。下一个,[レビュー]单击并输入角色的名称和描述。下一个,[役割の作成]单击。现在您已经准备好担任Lambda的角色。
编辑lambda角色的信任关系
在AWS IAM控制台中,选择上面创建的lambda角色。下一个,[信頼関係]选择一个标签,然后[信頼関係の編集]单击。
编辑lambda角色的信任关系
在打开的窗口中,删除所有策略文档,然后粘贴以下代码。下一个,[信頼ポリシーの更新]单击。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
现在您可以将日志流传输到Lambdalol中的Elasticsearch Kibana。
为日志组创建Elasticsearch订阅
在CloudWatch控制台中,选择一个日志组。 选择您要为其创建Elasticsearch订阅的日志组。在日志组窗口中,选择一个操作,然后从下拉菜单中[Elasticsearchサブスクリプションフィルターの作成]选择。
创建一个ElasticSearch订阅过滤器
在打开的窗口中,选择为其创建ES群集的帐户。在这种情况下,它与CloudWatch日志组位于同一帐户中。 我选择“此帐户”,然后选择Amazon ES集群以流式传输日志。然后从Lambda函数下方的下拉列表中选择您先前创建的Lambda执行角色。
选择Elasticsearch集群和Lambda执行角色
向下滚动,将提示您配置日志格式和过滤器。见下文。
日志格式和过滤器配置
请选择所需的日志。您还可以测试并查看其外观。如果你满意[ストリーミングの開始]单击。您现在应该将日志视为Elasticsearch Kibana的索引。
修改Lambda函数以流式传输来自多个日志组的日志
要将日志从多个CloudWatch日志组流到Elasticsearch集群,您需要修改上面创建的原始Lambda函数的代码。 用以下代码替换Lambda函数代码。您只需更改var端点(代码段的第5行)即可。确保将其替换为Elasticsearch集群端点。完成后[保存]单击。
// v1.1.2
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');
var endpoint="search-test-es-cluster-**************************.eu-west-1.es.amazonaws.com";
// Set this to true if you want to debug why data isn't making it to
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;
exports.handler = function(input, context) {
// decode input from base64
var zippedInput = new Buffer.from(input.awslogs.data, 'base64');
// decompress the input
zlib.gunzip(zippedInput, function(error, buffer) {
if (error) { context.fail(error); return; }
// parse the input from JSON
var awslogsData = JSON.parse(buffer.toString('utf8'));
// transform the input to Elasticsearch documents
var elasticsearchBulkData = transform(awslogsData);
// skip control messages
if (!elasticsearchBulkData) {
console.log('Received a control message');
context.succeed('Control message handled successfully');
return;
}
// post documents to the Amazon Elasticsearch Service
post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
console.log('Response: ' + JSON.stringify({
"statusCode": statusCode
}));
if (error) {
logFailure(error, failedItems);
context.fail(JSON.stringify(error));
} else {
console.log('Success: ' + JSON.stringify(success));
context.succeed('Success');
}
});
});
};
function transform(payload) {
if (payload.messageType === 'CONTROL_MESSAGE') {
return null;
}
var bulkRequestBody = '';
payload.logEvents.forEach(function(logEvent) {
var timestamp = new Date(1 * logEvent.timestamp);
// index name format: cwl-YYYY.MM.DD
//var indexName = [
//'cwl-' + timestamp.getUTCFullYear(), // year
//('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month
//('0' + timestamp.getUTCDate()).slice(-2) // day
//].join('.');
// index name format: cwl-YYYY.MM.DD
//var appName =payload.logGroup.toLowerCase();
//var indexName="";
var indexName = [
'cwl-' + payload.logGroup.toLowerCase().split('/').join('-') + '-' + timestamp.getUTCFullYear(), // log group + year
('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month
('0' + timestamp.getUTCDate()).slice(-2) // day
].join('.');
var source = buildSource(logEvent.message, logEvent.extractedFields);
source['@id'] = logEvent.id;
source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
source['@message'] = logEvent.message;
source['@owner'] = payload.owner;
source['@log_group'] = payload.logGroup;
source['@log_stream'] = payload.logStream;
var action = { "index": {} };
action.index._index = indexName;
action.index._type = payload.logGroup;
action.index._id = logEvent.id;
bulkRequestBody += [
JSON.stringify(action),
JSON.stringify(source),
].join('n') + 'n';
});
return bulkRequestBody;
}
function buildSource(message, extractedFields) {
if (extractedFields) {
var source = {};
for (var key in extractedFields) {
if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
var value = extractedFields[key];
if (isNumeric(value)) {
source[key] = 1 * value;
continue;
}
jsonSubString = extractJson(value);
if (jsonSubString !== null) {
source['$' + key] = JSON.parse(jsonSubString);
}
source[key] = value;
}
}
return source;
}
jsonSubString = extractJson(message);
if (jsonSubString !== null) {
return JSON.parse(jsonSubString);
}
return {};
}
function extractJson(message) {
var jsonStart = message.indexOf('{');
if (jsonStart < 0) return null;
var jsonSubString = message.substring(jsonStart);
return isValidJson(jsonSubString) ? jsonSubString : null;
}
function isValidJson(message) {
try {
JSON.parse(message);
} catch (e) { return false; }
return true;
}
function isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n);
}
function post(body, callback) {
var requestParams = buildRequest(endpoint, body);
var request = https.request(requestParams, function(response) {
var responseBody = '';
response.on('data', function(chunk) {
responseBody += chunk;
});
response.on('end', function() {
var info = JSON.parse(responseBody);
var failedItems;
var success;
var error;
if (response.statusCode >= 200 && response.statusCode < 299) {
failedItems = info.items.filter(function(x) {
return x.index.status >= 300;
});
success = {
"attemptedItems": info.items.length,
"successfulItems": info.items.length - failedItems.length,
"failedItems": failedItems.length
};
}
if (response.statusCode !== 200 || info.errors === true) {
// prevents logging of failed entries, but allows logging
// of other errors such as access restrictions
delete info.items;
error = {
statusCode: response.statusCode,
responseBody: info
};
}
callback(error, success, response.statusCode, failedItems);
});
}).on('error', function(e) {
callback(e);
});
request.end(requestParams.body);
}
function buildRequest(endpoint, body) {
var endpointParts = endpoint.match(/^([^.]+).?([^.]*).?([^.]*).amazonaws.com$/);
var region = endpointParts[2];
var service = endpointParts[3];
var datetime = (new Date()).toISOString().replace(/[:-]|.d{3}/g, '');
var date = datetime.substr(0, 8);
var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
var kRegion = hmac(kDate, region);
var kService = hmac(kRegion, service);
var kSigning = hmac(kService, 'aws4_request');
var request = {
host: endpoint,
method: 'POST',
path: '/_bulk',
body: body,
headers: {
'Content-Type': 'application/json',
'Host': endpoint,
'Content-Length': Buffer.byteLength(body),
'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
'X-Amz-Date': datetime
}
};
var canonicalHeaders = Object.keys(request.headers)
.sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
.map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
.join('n');
var signedHeaders = Object.keys(request.headers)
.map(function(k) { return k.toLowerCase(); })
.sort()
.join(';');
var canonicalString = [
request.method,
request.path, '',
canonicalHeaders, '',
signedHeaders,
hash(request.body, 'hex'),
].join('n');
var credentialString = [ date, region, service, 'aws4_request' ].join('/');
var stringToSign = [
'AWS4-HMAC-SHA256',
datetime,
credentialString,
hash(canonicalString, 'hex')
] .join('n');
request.headers.Authorization = [
'AWS4-HMAC-SHA256 Credential=" + process.env.AWS_ACCESS_KEY_ID + "/' + credentialString,
'SignedHeaders=" + signedHeaders,
"Signature=" + hmac(kSigning, stringToSign, "hex')
].join(', ');
return request;
}
function hmac(key, str, encoding) {
return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}
function hash(str, encoding) {
return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}
function logFailure(error, failedItems) {
if (logFailedResponses) {
console.log('Error: ' + JSON.stringify(error, null, 2));
if (failedItems && failedItems.length > 0) {
console.log("Failed Items: " +
JSON.stringify(failedItems, null, 2));
}
}
}
您现在可以将多个日志流传输到Elasticsearch集群Kibana。
重要连结
相关文章:
- 在AWS上使用Kibana设置Elasticsearch集群
- 在不影响master分支的情况下在Git中实现新功能
- 将AWS VPC流日志设置为CloudWatch Log组
AWS学习课程:
★★★★★(190636)$ 14.08 $ 152.48有库存
Udemy.com
★★★★★(45225)$ 16.42 $ 175.94有库存
Udemy.com
★★★★☆(37770)$ 14.08 $ 152.48有库存
Udemy.com
★★★★★(26864)$ 19.94 $ 199.40有库存
Udemy.com您可以从下面的链接以PDF格式下载本文,以为我们提供支持。以PDF格式下载指南关闭关闭关闭