ElasticSearch - Python ES & Query DSL
ES 优化及相关疑问
ELK Stack 在新浪微博的最佳实践(实录) ELK Stack 在新浪微博的最佳实践(问答)
最佳实践
分享一点具体在公司的应用规模和优化点。在新浪,目前我们团队的 ES 是有 25 种不同的日志,保留最近 7 天数据,合计 650 亿条。花了 26 个数据节点,42G 内存,2.4T SAS 磁盘,8 核 CPU 的配置。这个规模踩过的坑也不少。
- 通过 doc_values 设置,预先生成 fielddata 到磁盘上。否则每次搜索时再生成 fielddata 到内存,分分钟爆内存;
- Recovery 和 Relocation。ES 默认的这方面的参数极其保守,不加大的话,一次重启可能一两天都恢复不完;
- 自动发现。默认的 multicast 包在公有云上会被认为是恶意扫描攻击;
- ES 的默认分片原则是尽量保证各节点的分片总数一致。而作为日志,一般只有当天的索引是有 IO 压力的。那么新加节点第二天可能因为分片没迁移完(尤其是前面第二条说的,保守的默认配置下),导致 ES 直接把当天的分片全分配到这一台上,然后直接压死。这个需要在索引级别指定单节点最多容许分配几个分片。ES 在分布式结构上跟 MongoDB 有点像,比如 Shard、Replication。但是 ES 的 translog 是 flush 完就删的,不会长期保持。所以在 Recovery 的时候,Replication、Shard 是每次都要从 Primary Shard 完整的走网络再传一次,这点很头大。所以计划内的重启操作,提前先把自动 Allocate 暂停掉,免得浪费流量。ES 的默认 Shard 策略是达到各节点的 Shard 总数均衡,不考虑 IO。而做日志的时候,只有最近的索引的分片才有 IO 压力。所以如果你今天新上一个节点,然后隔天新建索引,一看,所有新分片全给分配到这台上,直接 IO 压垮。这个是有一个配置,针对索引级别的,可以对每个索引指定在单个节点上,最多分配几个分片。如果集群 Scaling 是比较经常见的时期,一定配上这个;
- ES是 schema-less,不是 no-schema。对同一个索引下,字段名字如果一样,而类型不一样的,他在 indexing的 时候会按照第一个确定下来的类型做处理。比如 indexa/typea 下一个 key:112,indexa/typeb 下一个 key:”abc”。写入都能成功,但是搜索的时候,就全乱套了。对应前面说的不分词字段,Logstash 默认带的 template 会加一个 ignore_above:256,也是就是大于 256 字节的,也跳过 indexing 过程,那么这条数据的这个字段的内容,就即搜不到,也统计不到。只能在搜其他字段的时候看到这个的内容。我们在做 Crash 的日志排序的时候,就碰到这个问题,函数堆栈很容易就超过 256 字节的。所以要单独控制一下,特殊的字段,把这个加大。
Python ES基本
安装
根据ES的版本进行选择,本文中ES版本2.4.1,所以使用pip install elasticsearch2
进行安装
建立es连接
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host':'192.168.1.1','port':8200}])
数据检索
使用【q】参数,表示使用的是lucene风格的语法,就是kibana前端的语法 使用【body】参数,表示使用的是DSL语法,就是一个json的格式,可以使用curl命令直接获取数据的
es.search(index='logstash-2015.08.20', q='http_status_code:5* AND server_name:"web1"', from_='124119')
常用参数
参数名 | 功能 |
---|---|
index | 索引名 |
q | 查询指定匹配 使用Lucene查询语法 |
from_ | 查询起始点 默认0 |
doc_type | 文档类型 |
size | 指定查询条数 默认10 |
field | 指定字段 逗号分隔 |
sort | 排序 字段:asc/desc |
body | 使用Query DSL |
scroll | 滚动查询 |
统计查询功能
语法同search大致一样,但只输出统计值
es.count(index='logstash-2015.08.21', q='http_status_code:500')
#{u'_shards':{u'failed':0, u'successful':5, u'total':5}, u'count':17042}
Demo:滚动查询全部数据
实现了一次取若干数据,数据取完之后结束。需要注意的是,这个查询不会获取到最新更新的数据。滚动完之后想获取最新数据怎么办?滚动的时候会有一个统计值,如total: 5。跳出循环之后,我们可以用_from参数定位到5开始滚动之后的数据。
# Initialize the scroll
page = es.search(
index ='yourIndex',
doc_type ='yourType',
scroll ='2m',
search_type ='scan',
size =1000,
body ={
# Your query's body
})
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
while(scroll_size >0):
print "Scrolling..."
page = es.scroll(scroll_id = sid, scroll ='2m')
# Update the scroll ID
sid = page['_scroll_id']
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print "scroll size: "+ str(scroll_size)
# Do something with the obtained page
Query DSL
基本功能介绍
range过滤器查询一定范围
参数名 | 功能 |
---|---|
gt | > 大于 |
lt | < 小于 |
gte | >= 大于或等于 |
lte | <= 小于或等于 |
"range":{
"money":{
"gt":20,
"lt":40
}
}
bool组合过滤器
参数名 | 功能 |
---|---|
must | 所有分句都必须匹配,与 AND 相同 |
must_not | 所有分句都必须不匹配,与 NOT 相同 |
should | 至少有一个分句匹配,与 OR 相同 |
{
"bool":{
"must":[],
"should":[],
"must_not":[],
}
}
term过滤器
字段包含关键词
{
"terms":{
"money":20 # 或者 "money": [20,30]
}
}
正则
{
"regexp": {
"http_status_code": "5.*"
}
}
match查询
# 精确匹配
{
"match":{
"email":"123456@qq.com"
}
}
# 多字段搜索
{
"multi_match":{
"query":"11",
"fields":["Tr","Tq"]
}
}
DSL Demo
获取最近一小时的数据
{
'query':
{'filtered':
{'filter':
{'range':
{'@timestamp':{'gt':'now-1h'}}
}
}
}
}
条件过滤查询
{
"query":{
"filtered":{
"query":{"match":{"http_status_code":500}},
"filter":{"term":{"server_name":"vip03"}}
}
}
}
Terms Facet 单字段统计
{'facets':
{'stat':
{'terms':
{'field':'http_status_code',
'order':'count',
'size':50}
}
},
'size':0
}
一次统计多个字段
{'facets':
{'cip':
{'terms':
{'fields':['client_ip']}},
'status_facets':{'terms':{'fields':['http_status_code'],
'order':'term',
'size':50}}},
'query':{'query_string':{'query':'*'}},
'size':0
}
多个字段一起统计
{'facets':
{'tag':
{'terms':
{'fields':['http_status_code','client_ip'],
'size':10
}
}
},
'query':
{'match_all':{}},
'size':0
}
统计一段时间内的IP的数量和流量
{
"size":0, # 去掉hits的内容
"query": {
"filtered": {
"filter": {
"range": {
"@timestamp": { # 近15分钟
"gt": "now-15m",
"lt": "now"
}
}
}
}
},
"aggs": {
"execute_time": {
"terms": { # 按匹配的条件进行数量统计
"size":99, # 99条统计结果,2.x版本以上不支持显示先去结果
"field": "ipv4_src_addr"
},
"aggs": { # 在一个语句aggs的内部嵌套一个aggs进行求和,类似组合 sum + group by
"do_a_sum_on_field_yyy": {
"sum": {
"field": "in_bytes"
}
}
}
}
}
}
统计一段时间内的日志数量
{
"facets": {
"0": {
"date_histogram": {
"field": "@timestamp",
"interval": "5m"
},
"facet_filter": {
"fquery": {
"query": {
"filtered": {
"query": {
"query_string": {
"query": "*"
}
},
"filter": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
'gt': 'now-1h'
}
}
},
{
"exists": {
"field": "http_status_code.raw"
}
},
{
"query": {
"query_string": {"query": "backend_name:baidu.com"}
}
},
]
}
}
}
}
}
}
}
},
"size": 0
}