python 企业微信告警

目标:

为了实现服务器性能监控指标告警,通过alertmanager触发告警后发送到webhook,然后通过python脚本实现数据清洗,并把告警内容发送给指定的企业微信用户。

由于频繁获取企业微信api可能会管理员拉入小黑屋,所有需要把获取access_token缓存到redis上,然后取数据在redis获取即可。

#!/usr/bin/env Python
# -*- coding:utf-8 -*-

import json
import time
import requests
import os
import redis
from flask import request, Flask    


class  qywx_obj:
    def __init__(self):
        self.CORPID = 'xxxxxxxxxxxxx'   # 企业的corpid
        self.CORPSECRET_SEND_MSG = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 应用的secret
        self.access_token = self.get_access_token(self.CORPSECRET_SEND_MSG)
        self.AGENTID = 1000002
        self.user_info = self.get_user_info()
    
    def get_access_token(self, CORPSECRET):
        url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
        values = {
                 'corpid': self.CORPID,
                'corpsecret': CORPSECRET,
        }
        req = requests.post(url, params=values)
        ACCESS_TOKEN = json.loads(req.text)["access_token"]
        return ACCESS_TOKEN

    def get_user_info(self):
        GET_USER_URL = 'https://qyapi.weixin.qq.com/cgi-bin/user/list'
        CORPSECRET_GET_INFO = 'xxxxxxxxxxxxxxx'    # 联系人的secret 用来获取部门的成员信息
        access_token = self.get_access_token(CORPSECRET_GET_INFO)
        values = {
                'access_token': access_token,
                'department_id': 1,
                'fetch_child': 1,
            }
        USER_INFO = json.loads(requests.post(GET_USER_URL, params=values).text)["userlist"]
        return USER_INFO

    def send_data(self, TOUSER, message, access_token):
        send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + self.access_token
        send_values = {
            "touser": TOUSER,
            "msgtype": "text",
            "agentid": 1000002,
            "text": {
                "content": message
                },
            "safe": "0"
            }
        send_msges=(bytes(json.dumps(send_values), 'utf-8'))
        respone = requests.post(send_url, send_msges)
        respone = respone.json()  
        return respone["errmsg"]

class redis_obj:
    def __init__(self):
        self.host = "172.16.40.24"
        self.port = 6379
        self.r = self.get_redis_obj()

    def get_redis_obj(self):
        pool=redis.ConnectionPool(host=self.host,port=self.port,db=0)
        r = redis.StrictRedis(connection_pool=pool)
        return r

    def redis_cache(self, key, data):
        self.r.set(key, data)
        self.r.expire(key, 3600)
        

    def get_access_from_redis(self, key):
        if self.r.get(key):
            return self.r.get(key)

    def judge_redis_keys(self, key):
        if self.r.exists(key) == 1:
            return True
        else:
            return False
        

def GetData():
    PostData = request.get_data().decode()
    Data = json.loads(PostData)  
    JsonData = json.dumps(Data, ensure_ascii=False, indent=4)   
    return JsonData

app = Flask(__name__)
app.config['JSON_AS_ASCII'] = False
 
 
@app.route('/webhook/', methods=['POST'])
def IssueCreate():
    qywx = qywx_obj()
    rds = redis_obj()
    if not rds.judge_redis_keys("key_for_msg"):
        rds.redis_cache("key_for_msg",qywx.access_token)
    if not rds.judge_redis_keys("key_for_info"):
        USER_INFO = qywx.user_info
        rds.redis_cache("key_for_info",json.dumps(USER_INFO))
    access_token_for_sendmsg = rds.get_access_from_redis("key_for_msg")
    USER_INFO = json.loads(rds.get_access_from_redis("key_for_info").decode())

    description = json.loads(GetData())
    alert_info = description['alerts']
    for n in range(len(alert_info)):
        hostname = alert_info[n]['labels']['vm_name']
        try:
            IP = "172.16." + hostname.split("-")[2]
        except:
            continue
        level = alert_info[n]['labels']['severity']
        Server_supervisor = hostname.split("-")[1]
        alertname = alert_info[n]['labels']['alertname']
        annotations = alert_info[n]['annotations']['summary']
        content = '''
        负责人:%s
        服务器地址:%s
        告警信息:%s
        严重程度:%s
        '''%(Server_supervisor, IP, annotations, level)
        usermail = "%s@qq.com"%Server_supervisor
        for index in range(len(USER_INFO)):
            if USER_INFO[index]["email"] == usermail:
                TOUSER = USER_INFO[index]["userid"]
                print(TOUSER, IP)
                result = qywx.send_data(TOUSER, content, access_token_for_sendmsg)
                print(result)
                break
    return "OK", 200
    
 
if __name__ == '__main__':
    app.run(debug = True, host = '172.16.40.24', port = 6688)

  

原文地址:https://www.cnblogs.com/hel7512/p/13974126.html