node.js连接Elasticsearch做日志分析,
logstash是一个数据分析软件,主要目的是分析log日志。整一套软件可以当作一个MVC模型,logstash是controller层,Elasticsearch是一个model层,kibana是view层。首先将数据传给logstash,它将数据进行过滤和格式化(转成JSON格式),然后传给Elasticsearch进行存储、建搜索的索引,kibana提供前端的页面再进行搜索和图表可视化,它是调用Elasticsearch的接口返回的数据进行可视化。
在logstash 配置文件中写清我们的日志文件的路径:
input {
file {
path => "xxxx.log" #监听文件路径
}
}
文件只要变动 就会更新到es中,
我们可以在kibana看到 我们日志中的字段都已json的形式 转载在_source
@timestamp 则是装入的时间戳
node调用 Elasticsearch
var elasticsearch = require('elasticsearch');
var esClient = new elasticsearch.Client({
host: 'http://192.168.129.119:9200',
log: 'error'
});
module.exports = esClient;
当前只使用到了search方法,
更多方法:https://www.npmjs.com/package/elasticsearch
根据需求,按用户、机构、API等查看各个API的访问情况,聚合数据,按小时统计
如 按机构 ,
var index_date="logstash-"+options.date.replace(/-/g, ".");
esClient.search({
index: index_date,
body: {
"size": 0,
"query": {
"bool": {
"must": [
org_search,
user_search,
api_search,
status_search_su
] ,
"must_not": [
status_search_err
],
"should": []
}
},
"aggs": {
"date": {
"date_histogram": {
"field": "@timestamp",
"interval": "hour",
"format": "yyyy-MM-dd HH",
"min_doc_count" : 0,
"extended_bounds" : {
"min": options.date+" 00",
"max": options.date+" 23"
}
},
"aggs": {
"group_by_org": {
"terms": {
"field": "request.headers.orgid.keyword",
"size": 1000
}
}
}
}
}
}
}, function (error, response) {
if(error!=null){
callback(error,null);
}else{
var data=response.aggregations.date.buckets;
callback(null,data);
}
});
其中:
org_search 是 org_search={"term":{"request.headers.orgid.keyword":""+options.orgid+""}}; orgid是机构ID,
"query": {
"bool": {
"must": [
] ,
"must_not": [
],
"should": []
}
}
用于多重过滤,如查询某机构下某用户某返回状态码,,,must是需符合,,must_not是除这个结果之外,,例如想查询异常的 就把,,
status_search_err={"term":{"response.status":"200"}} 为200返回正常的放入 must_not中
而aggs就用来聚合数据,,
按时间戳 整成这一天中 每小时,,都有哪些机构,,分别的访问次数是多少
最后的结果类似于:
"buckets":[{
"key_as_string":"2018-01-22 00",
"key":1,
"doc_count":1
},{
"key_as_string":"2018-01-22 01",
"key":2,
"doc_count":2
},
...
]}
实现top5 功能:
如最新的5个api 的url统计
"aggs": {
"top_tags": {
"terms": {
"field": "api.name.keyword",
"size": 5
},
"aggs": {
"top_url_hits": {
"top_hits": {
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
],
"_source": {
"includes": [ "api.upstream_url" ,"response.status"]
},
"size" : 1
}
}
}
}
}
按最新时间显示前五个