记录bigdesk中ElasticSearch的性能参数

定时采集bigdesk中的Elasticsearch性能参数,并保存到数据库或ELK,以便于进行长期监控。
基于python脚本实现,脚本如下:
#coding=gbk

import httplib
import json
import time
import es_savelog
import ConfigHelper
import MQHelper


def main():

#变量初始化
#上一次统计数据
dictLastNodeInfo={}
#本次统计当前节点
dictNodeInfo={}

print "start..."
while 1==1:
flag=ConfigHelper.GetIntConfig("Flag")
if flag <> 1:
#判断是否满足退出条件
print "终止"+str(flag)
break

urlarray = ConfigHelper.GetStringConfig("EsUrl").split('|')
#取出每次执行完成后的休眠时长:秒
sleeptime=ConfigHelper.GetFloatConfig("SleepTime")

for urlindex in range(0,len(urlarray)):
url=urlarray[urlindex]
conn = httplib.HTTPConnection(url)

#取出ES版本号
conn.request("GET","")
serverinfo=conn.getresponse()
objServerJson=json.loads(serverinfo.read())
esVersion=str(objServerJson["version"]["number"])

#取出集群健康状况
conn.request("GET","/_cluster/health")
healthinfo=conn.getresponse()
objHealthJson=json.loads(healthinfo.read())
health=str(objHealthJson["status"])

#取出各ES节点统计数据
conn.request("GET", "/_nodes/stats?human=true")
nodesread = conn.getresponse()
objNodesJson=json.loads(nodesread.read())

for i in range(0,len(objNodesJson["nodes"].values())):
try:
esNode=objNodesJson["nodes"].values()[i]
nodename=str(esNode["name"])
dictNodeInfo["EsVersion"]=esVersion
dictNodeInfo["Health"]=health

#记录ES节点名称
dictNodeInfo["NodeName"]=nodename
dictNodeInfo["Interval"]=sleeptime

#记录CPU信息
dictNodeInfo["OSUserCpu"]=esNode["os"]["cpu"]["user"]

#记录ThreadpoolCount
dictNodeInfo["ThreadpoolCount"]=esNode["thread_pool"]["search"]["active"]

#记录JVM堆内存
dictNodeInfo["HeapMem"]=float(esNode["jvm"]["mem"]["heap_used"].replace("gb","").replace("mb",""))
curGCYoungCount=int(esNode["jvm"]["gc"]["collectors"]["young"]["collection_count"])
curGCOldCount=int(esNode["jvm"]["gc"]["collectors"]["old"]["collection_count"])
curGCYoungTime=int(esNode["jvm"]["gc"]["collectors"]["young"]["collection_time_in_millis"])
curGNCOldTime=int(esNode["jvm"]["gc"]["collectors"]["old"]["collection_time_in_millis"])
lastGCYoungCount=int(dictLastNodeInfo.get(nodename+"_GCYoungCount",-1))
lastGCOldCount=int(dictLastNodeInfo.get(nodename+"_GCOldCount",-1))
lastGCYoungTime=int(dictLastNodeInfo.get(nodename+"_GCYoungTime",-1))
lastGCOldTime=int(dictLastNodeInfo.get(nodename+"_GCOldTime",-1))
if lastGCYoungCount>=0 and lastGCOldCount>=0 and lastGCYoungTime>=0 and lastGCYoungTime>=0:
dictNodeInfo["GCYoungCount"]=curGCYoungCount-lastGCYoungCount
dictNodeInfo["GCOldCount"]=curGCOldCount-lastGCOldCount
dictNodeInfo["GCYoungTime"]=curGCYoungTime-lastGCYoungTime
dictNodeInfo["GCOldTime"]=curGNCOldTime-lastGCOldTime
if lastGCOldCount>0:
dictNodeInfo["GCYOCountRate"]=lastGCYoungCount/lastGCOldCount
dictLastNodeInfo[nodename+"_GCYoungCount"]=curGCYoungCount
dictLastNodeInfo[nodename+"_GCOldCount"]=curGCOldCount
dictLastNodeInfo[nodename+"_GCYoungTime"]=curGCYoungTime
dictLastNodeInfo[nodename+"_GCOldTime"]=curGNCOldTime

#记录连接数信息
dictNodeInfo["ChannelTransport"]=esNode["transport"]["server_open"]
dictNodeInfo["ChannelHttp"]=esNode["http"]["current_open"]

#记录当前节点Indices-Query信息
objSearch=esNode["indices"]["search"]
curQueryTotal=objSearch["query_total"]
curFetchTotal=objSearch["fetch_total"]
curTimestamp=esNode["timestamp"]
lastQueryTotal=dictLastNodeInfo.get(nodename+"_QueryTotal",-1)
lastFetchTotal=dictLastNodeInfo.get(nodename+"_FetchTotal",-1)
lastTimestamp=dictLastNodeInfo.get(nodename+"_Timestamp",-1)

if lastQueryTotal>0 and curQueryTotal>0:
curQueryCount=curQueryTotal-lastQueryTotal
curFetchCount=curFetchTotal-lastFetchTotal
curQueryTime=(curTimestamp-lastTimestamp)/1000
dictNodeInfo["Interval"]=curQueryTime
#print curQueryTotal,lastQueryTotal,curQueryCount,curTimestamp,lastTimestamp,curQueryTime,curQueryCount/curQueryTime
#记录QPS
if curQueryTime>0:
dictNodeInfo["IndicesQueryPS"]=curQueryCount/curQueryTime
dictNodeInfo["IndicesFetchPS"]=curFetchCount/curQueryTime
#print curQueryCount,curQueryTime,curQueryCount/curQueryTime

#更新上次节点数据对象
dictLastNodeInfo[nodename+"_QueryTotal"]=curQueryTotal
dictLastNodeInfo[nodename+"_FetchTotal"]=curFetchTotal
dictLastNodeInfo[nodename+"_Timestamp"]=curTimestamp

#取出cache信息
dictNodeInfo["FilterCache"] = float(esNode["indices"]["filter_cache"]["memory_size"].replace("mb","").replace("kb",""))
dictNodeInfo["FieldCache"] = float(esNode["indices"]["fielddata"]["memory_size"].replace("mb","").replace("kb",""))

#保存数据到数据库
if(dictNodeInfo.get("IndicesQueryPS",-1) < 0 or dictNodeInfo.get("GCYoungCount",-1) < 0):
continue
es_savelog.SaveLog(dictNodeInfo)

#推送ELK消息
dictNodeInfo["IndexName"] = "esbigdesk"
dictNodeInfo["LogTime"] = time.strftime("%Y-%m-%d %H:%M:%S.000", time.localtime())
print json.dumps(dictNodeInfo)
MQHelper.SendMessage(json.dumps(dictNodeInfo))
dictNodeInfo.clear()
except Exception,ex:
print Exception,":",ex


#休眠
time.sleep(sleeptime)


#启动
if __name__=="__main__":
main()
print "over"
原文地址:https://www.cnblogs.com/lijunhao/p/5341331.html