/ 城寒 / ElasticSearch - Filebeat to Logstash

ElasticSearch - Filebeat to Logstash

2018-03-03 posted in [ElasticSearch]

使用filebeat和logstash实现自定义的日志解析

常用结构

经过比较,数据源部署logstash成本较高,不仅文件大,而且运行时占用CPU和内存都显著大于filebeat

filebeat只负责发送和简单处理,功能较少

部署

本文采用第二种方案 需要配置三个地方,filebeat,logstash,elasticsearch

Filebeat

使用golang编写,对环境没有特殊要求 文件复制到指定服务器,修改配置文件后启动即可 ./filebeat -c filebeatconfig.yml

##
## 复制一份filebeat.yml
## 只修改对应部分,其余的不变
##
## 修改数据源配置
filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /home/path/log/log_file.INFO
  # 如果这个文件是软连接,需要开启标志,否则无法获取到数据
  symlinks: true
  fields:
    level: INFO
-
  paths:
    - /home/path/log/log_file.WARNING
  symlinks: true
  fields:
    level: WARNING
-
  paths:
    - /home/path/log/log_file.ERROR
  symlinks: true
  fields:
    level: ERROR

  ## 其他示例,可以是多个文件用 * 匹配 
  path:
    - /home/path/log/log_file*
 
 
## 修改输出配置
output.logstash:
  # The Logstash hosts
  hosts: ["1.2.3.4:10086"]

Logstash

要求Java版本1.8 和Filebeat一样,只需要修改配置运行即可./logstash -f config/logstashconf.conf grok正则匹配已有类型

## Logstash grok 示例
## 正则名称对应的内容参考:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
# Example 1
#   2018-03-02 00:02:18 pythonfile.py[line:1] INFO aaa bbb ccc
#   %{TIMESTAMP_ISO8601:log_time} %{USERNAME:code_file}\[line:%{NUMBER:code_line}\] %{WORD:log_level}%{SPACE}%{GREEDYDATA:content} 
# Example 2 - stackoverflow
#   14:46:16.603 [http-nio-8080-exec-4] INFO  METERING - msg=93e6dd5e-c009-46b3-b9eb-f753ee3b889a CREATE_JOB job=a820018e-7ad7-481a-97b0-bd705c3280ad data=71b1652e-16c8-4b33-9a57-f5fcb3d5de92
#   %{TIME:timestamp} \[%{USERNAME:http}\] %{WORD:loglevel}%{SPACE}%{WORD:logtype} - msg=%{UUID:msg} %{WORD:action} job=%{UUID:job} data=%{UUID:data}
# Example 3
#   E123 18:28:11.070340   25673 gofile.go:184] aaa bbb ccc
#   %{USERNAME:code}%{SPACE}%{TIME:log_time}%{SPACE}%{NUMBER:pid}%{SPACE}%{USERNAME:code_file}:%{NUMBER:code_line}\]%{SPACE}%{GREEDYDATA:content}
 
## 
input {
	# 对应filebeat的ouput
    beats {
        port => 10086
        ssl => false
    }
}
filter {
    grok {
		# 对一样日志进行匹配,获取里面的信息
		# %{TIME:log_time} 表示:在这个位置,使用TIME对应的正则表达式进行匹配,把结果填到log_time这个字段
        match => {"message" => "%{USERNAME:log_type}%{SPACE}%{TIME:log_time}%{SPACE}%{NUMBER:pid}%{SPACE}%{USERNAME:code_file}:%{NUMBER:code_line}\]%{SPACE}%{GREEDYDATA:content}"}
    }
    mutate {
        rename => {
            "source" => "log_file"
            "host" => "log_host"
        }
        add_field => {
            # 由于日志中只有时间没有日期,先添加一个包含日期的字段
            "datetime" => "%{+yyyy-MM-dd} %{log_time}"
            "log_level" => "%{[fields][level]}"
        }
    }
    date {
		# 日志中的时间转换为ES中的时间
        match => [
            "datetime",
            "YYYY-MM-dd HH:mm:ss.SSSSSS"
        ]
        target => "log_time"
    }
    mutate {
        remove_field => [
            "message",
            "tags",
            "offset",
            "prospector",
            "@version",
            "beat",
            "tags",
            "fields",
            "datetime"
        ]
    }
}
output {
	# 实际上只需要输出到es即可,此处要设置index分表的名称规则
    stdout { codec => rubydebug }
    elasticsearch {
        index => "template_name-%{+YYYY.MM.dd}"
        document_type => "template_name"
        hosts => ["1.2.3.4:123","1.2.3.5:123"]
    }
}

ElasticSearch

ES中需要添加template,用来对收到的数据进行mapping

# 添加 bdc_worker_log_v1
curl -XPUT http://1.2.3.4:123/_template/template_name -d '索引内容-会比较长'
# 删除 bdc_worker_log_v1
curl -XDELETE http://1.2.3.4:123/_template/template_name

内容如下:

{
	"order": 0,
	"template": "template_name-*",
	"settings": {
		"index.refresh_interval": "1s",
		"index.store.type": "mmapfs"
	},
	"mappings": {
		"template_name": {
			"_source": {
				"enabled": true
			},
			"_all": {
				"enabled": false
			},
			"properties": {
				"@timestamp": {
					"index": "not_analyzed",
					"type": "date",
					"store": false,
					"doc_values": true
				},
				"log_time": {
					"index": "not_analyzed",
					"type": "date",
					"store": false,
					"doc_values": true
				},
				"log_host": {
					"index": "not_analyzed",
					"type": "string",
					"store": false
				},
				"log_file": {
					"index": "not_analyzed",
					"type": "string",
					"store": false
				},
				"log_level": {
					"index": "not_analyzed",
					"type": "string",
					"store": false
				},
				"content": {
					"index": "analyzed",
					"type": "string",
					"store": false
				},
				"code_file": {
					"index": "not_analyzed",
					"type": "string",
					"store": false
				},
				"code_line": {
					"index": "not_analyzed",
					"type": "integer",
					"store": false
				},
				"log_type": {
					"index": "not_analyzed",
					"type": "string",
					"store": false
				}
			}
		}
	},
	"aliases": {}
}

几个配置的介绍

Kibana

Kibana 上只需要添加刚加入的template即可,在【Setting】-> 【Index】点击【Add New】,添加类似 template_name-* 即可 返回【Discover】选择对应的索引查看即可