ElasticSearch - Filebeat to Logstash
使用filebeat和logstash实现自定义的日志解析
常用结构
- 数据源直接发送 -> logstash(索引) -> ES
适合数据源具备网络发送能力,logstash通过监听端口接收,数据量不能超过处理能力,会丢失
- 数据源部署filebeat -> logstash(索引) -> ES
数据源为文件,filebeat采集后发送,同样数据量不能过大
- 数据源部署filebeat -> logstash/Flume(收集) -> Kafka -> logstash(索引) -> ES
使用Kafka缓存,同时可以部署多个节点,处理能力较好
经过比较,数据源部署logstash成本较高,不仅文件大,而且运行时占用CPU和内存都显著大于filebeat
filebeat只负责发送和简单处理,功能较少
部署
本文采用第二种方案 需要配置三个地方,filebeat,logstash,elasticsearch
Filebeat
使用golang编写,对环境没有特殊要求
文件复制到指定服务器,修改配置文件后启动即可 ./filebeat -c filebeatconfig.yml
##
## 复制一份filebeat.yml
## 只修改对应部分,其余的不变
##
## 修改数据源配置
filebeat.prospectors:
- type: log
enabled: true
paths:
- /home/path/log/log_file.INFO
# 如果这个文件是软连接,需要开启标志,否则无法获取到数据
symlinks: true
fields:
level: INFO
-
paths:
- /home/path/log/log_file.WARNING
symlinks: true
fields:
level: WARNING
-
paths:
- /home/path/log/log_file.ERROR
symlinks: true
fields:
level: ERROR
## 其他示例,可以是多个文件用 * 匹配
path:
- /home/path/log/log_file*
## 修改输出配置
output.logstash:
# The Logstash hosts
hosts: ["1.2.3.4:10086"]
Logstash
要求Java版本1.8
和Filebeat一样,只需要修改配置运行即可./logstash -f config/logstashconf.conf
grok正则匹配已有类型
## Logstash grok 示例
## 正则名称对应的内容参考:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
# Example 1
# 2018-03-02 00:02:18 pythonfile.py[line:1] INFO aaa bbb ccc
# %{TIMESTAMP_ISO8601:log_time} %{USERNAME:code_file}\[line:%{NUMBER:code_line}\] %{WORD:log_level}%{SPACE}%{GREEDYDATA:content}
# Example 2 - stackoverflow
# 14:46:16.603 [http-nio-8080-exec-4] INFO METERING - msg=93e6dd5e-c009-46b3-b9eb-f753ee3b889a CREATE_JOB job=a820018e-7ad7-481a-97b0-bd705c3280ad data=71b1652e-16c8-4b33-9a57-f5fcb3d5de92
# %{TIME:timestamp} \[%{USERNAME:http}\] %{WORD:loglevel}%{SPACE}%{WORD:logtype} - msg=%{UUID:msg} %{WORD:action} job=%{UUID:job} data=%{UUID:data}
# Example 3
# E123 18:28:11.070340 25673 gofile.go:184] aaa bbb ccc
# %{USERNAME:code}%{SPACE}%{TIME:log_time}%{SPACE}%{NUMBER:pid}%{SPACE}%{USERNAME:code_file}:%{NUMBER:code_line}\]%{SPACE}%{GREEDYDATA:content}
##
input {
# 对应filebeat的ouput
beats {
port => 10086
ssl => false
}
}
filter {
grok {
# 对一样日志进行匹配,获取里面的信息
# %{TIME:log_time} 表示:在这个位置,使用TIME对应的正则表达式进行匹配,把结果填到log_time这个字段
match => {"message" => "%{USERNAME:log_type}%{SPACE}%{TIME:log_time}%{SPACE}%{NUMBER:pid}%{SPACE}%{USERNAME:code_file}:%{NUMBER:code_line}\]%{SPACE}%{GREEDYDATA:content}"}
}
mutate {
rename => {
"source" => "log_file"
"host" => "log_host"
}
add_field => {
# 由于日志中只有时间没有日期,先添加一个包含日期的字段
"datetime" => "%{+yyyy-MM-dd} %{log_time}"
"log_level" => "%{[fields][level]}"
}
}
date {
# 日志中的时间转换为ES中的时间
match => [
"datetime",
"YYYY-MM-dd HH:mm:ss.SSSSSS"
]
target => "log_time"
}
mutate {
remove_field => [
"message",
"tags",
"offset",
"prospector",
"@version",
"beat",
"tags",
"fields",
"datetime"
]
}
}
output {
# 实际上只需要输出到es即可,此处要设置index分表的名称规则
stdout { codec => rubydebug }
elasticsearch {
index => "template_name-%{+YYYY.MM.dd}"
document_type => "template_name"
hosts => ["1.2.3.4:123","1.2.3.5:123"]
}
}
ElasticSearch
ES中需要添加template,用来对收到的数据进行mapping
# 添加 bdc_worker_log_v1
curl -XPUT http://1.2.3.4:123/_template/template_name -d '索引内容-会比较长'
# 删除 bdc_worker_log_v1
curl -XDELETE http://1.2.3.4:123/_template/template_name
内容如下:
{
"order": 0,
"template": "template_name-*",
"settings": {
"index.refresh_interval": "1s",
"index.store.type": "mmapfs"
},
"mappings": {
"template_name": {
"_source": {
"enabled": true
},
"_all": {
"enabled": false
},
"properties": {
"@timestamp": {
"index": "not_analyzed",
"type": "date",
"store": false,
"doc_values": true
},
"log_time": {
"index": "not_analyzed",
"type": "date",
"store": false,
"doc_values": true
},
"log_host": {
"index": "not_analyzed",
"type": "string",
"store": false
},
"log_file": {
"index": "not_analyzed",
"type": "string",
"store": false
},
"log_level": {
"index": "not_analyzed",
"type": "string",
"store": false
},
"content": {
"index": "analyzed",
"type": "string",
"store": false
},
"code_file": {
"index": "not_analyzed",
"type": "string",
"store": false
},
"code_line": {
"index": "not_analyzed",
"type": "integer",
"store": false
},
"log_type": {
"index": "not_analyzed",
"type": "string",
"store": false
}
}
}
},
"aliases": {}
}
几个配置的介绍
- _source
保存原始数据,否则只保存索引后的结果
- _all
搜索时是否需要指定字段,如果否则需要指定字段,如查询content中包含fail的【content: fail】,如果为true则只需要【fail】
- “index”: “not_analyzed”/”analyzed”
表示该字段是否分词,”analyzed”表示分词,一般是按照空格或者其他特殊符号进行分词。 对于不需要拆分的字段,都要明确设置为not_analyzed的,因为ES默认是全部打开,有些情况下在kibana中不方便统计
- “doc_values”: true
保存该字段的正排索引
Kibana
Kibana 上只需要添加刚加入的template即可,在【Setting】-> 【Index】点击【Add New】,添加类似 template_name-* 即可 返回【Discover】选择对应的索引查看即可