环境

  • Centos 7.4
  • Python 2.7
  • Pip 2.7 MySQL-python 1.2.5 Elasticsearc 6.3.1
  • Elasitcsearch6.3.2

知识点

  • 调用Python Elasticsearh API
  • Python Mysqldb使用
  • DSL查询与聚合
  • Python 列表操作

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#minyt 2018.9.1
#获取24小时内出现的模块次数
# 该程序通过elasticsearch python client 获取相关精简数据,可以计算请求数、超时数、错误数、正确率、错误率等等
import MySQLdb
from elasticsearch import Elasticsearch
from elasticsearch import helpers

#定义elasticsearch集群索引名
index_name = "logstash-nginxlog-*"

#实例化Elasticsearch类,并设置超时间为180秒,默认是10秒的,如果数据量很大,时间设置更长一些
es = Elasticsearch(['elasticsearch01','elasticsearch02','elasticsearch03'],timeout=180)

#DSL(领域特定语言)查询语法,查询top50 sname的排列次数
data_sname = {
"aggs": {
"2": {
"terms": {
"field": "apistatus.sname.keyword",
"size": 100,
"order": {
"_count": "desc"
}
}
}
},
"size": 0,
"_source": {
"excludes": []
},
"stored_fields": [
"*"
],
"script_fields": {},
"docvalue_fields": [
"@timestamp"
],
"query": {
"bool": {
"must": [
{
"match_all": {}
},
{
"range": {
"@timestamp": {
"gte" : "now-24h/h",
"lt" : "now/h"
}
}
}
],
"filter": [],
"should": [],
"must_not": []
}
}
}

#按照DSL(特定领域语言)语法查询获取数据
def get_original_data():
try:
#根据上面条件搜索数据
res = es.search(
index=index_name,
size=0,
body=data_sname
)
return res

except:
print "get original data failure"

#初始化数据库
def init_mysql():
# 打开数据库连接
db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' )

# 使用cursor()方法获取操作游标
cursor = db.cursor()

# SQL 更新语句
sql = "update appname set count=0"
try:
# 执行SQL语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# 发生错误时回滚
db.rollback()

# 关闭数据库连接
db.close()


def updata_mysql(sname_count,sname_list):
# 打开数据库连接
db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' )

# 使用cursor()方法获取操作游标
cursor = db.cursor()

# SQL 更新语句
sql = "update appname set count=%d where sname = '%s'" % (sname_count,sname_list)
try:
# 执行SQL语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# 发生错误时回滚
db.rollback()

# 关闭数据库连接
db.close()


#根据Index数据结构通过Elasticsearch Python Client上传数据到新的Index
def import_process_data():
try:
#列表形式显示结果
res = get_original_data()
#print res
res_list = res.get('aggregations').get('2').get('buckets')
#print res_list

#初始化数据库
init_mysql()

#获取24小时内出现的SNAME
for value in res_list:
sname_list = value.get('key')
sname_count = value.get('doc_count')
print sname_list,sname_count
#更新sname_status值
updata_mysql(sname_count,sname_list)

except Exception, e:
print repr(e)


if __name__ == "__main__":
import_process_data()

总结

关键是DSL语法的编写涉及查询与聚合可以通过kibana的visualize或者devtool先测试出正确语法,然后结合python对列表、字典、除法、字符串等操作即可。下面汇总下各个算法: + 总请求 http_host.keyword: api.mydomain.com

  • 超长请求 http_host.keyword: api.mydomain.com AND request_time: [1 TO 600] NOT apistatus.status.keyword:*错误

  • 错误请求 apistatus.status.keyword:*错误 AND (http_host.keyword: api.mydomain.com OR http_host.keyword: api.yourdomain.com )

  • 请求健康度 域名与request_time聚合,域名请求时间小于3秒的次数除以总请求次数对应各个域名健康度

  • 请求正确率 域名与http状态码聚合,域名http状态码为200的次数除以域名总请求数对应各个域名的请求正确率