common

import datetime
import json
import logging
import logging.config
import math
import sys
from functools import wraps
from logging.handlers import RotatingFileHandler

import flask
import kubernetes
from aliyunsdkcore.client import AcsClient
from flask import current_app, g, got_request_exception
from flask_restful import reqparse, Resource, Api, http_status_message
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, SignatureExpired, BadSignature
from kubernetes.client import AppsV1Api, ExtensionsV1beta1Api
from kubernetes.client import V1Pod
from kubernetes.client.rest import ApiException
from sqlalchemy import text
from werkzeug.datastructures import Headers
from werkzeug.exceptions import BadRequest, HTTPException

from utils.email_tool import send_email
from utils.exceptions import ParameterError
from utils.response import unauthorized_response
from utils.short_message_tool import send_sms


def generate_auth_token(data, expiration=3600 * 24 * 15):
    s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
    return s.dumps(data).decode('utf-8')


def authorization(operation_set: set = None, role_set: set = None, menu_set: set = None,
                  certification_type: str = "competence", without_env: bool = True):
    """
    权限检测
    :param operation_set: 该接口所需要的操作权限集合,多个操作之间是 且
    :param role_set: 该接口所需要的角色要求,多个角色之间是 或
    :param menu_set: 该接口所需要的菜单权限集合
    :param certification_type: 验证类型 competence:操作权限控制   role基于角色控制
    :param without_env: 是否需要与环境关联, 默认不关联,个别操作需要判断环境
    :return:
    """

    def login_required(function):
        @wraps(function)
        def wrap(self, *args, **kwargs):
            if 'X-Token' not in flask.request.headers:
                return unauthorized_response()
            else:
                # token验证
                token = flask.request.headers.get('X-Token')
                s = Serializer(current_app.config['SECRET_KEY'])
                try:
                    data = s.loads(token.encode("utf-8"))
                except SignatureExpired:
                    return unauthorized_response(message="认证过期")
                except BadSignature:
                    return unauthorized_response(message="认证失败")
                except Exception as e:
                    current_app.logger.error(str(e))
                    return unauthorized_response(message="认证异常")
                results = self.get_query_results(
                    "select user_name, status from user where user_id=:user_id limit 1;",
                    user_id=data["user_id"],
                    fetch_result=True
                )
                if not results:
                    return unauthorized_response(message="用户不存在")
                if not results[0][1]:
                    return unauthorized_response(message="用户已被禁用")
                g.user_id = data["user_id"]
                g.user_name = results[0][0]
                if not check_competence(
                        self, operation_set=operation_set, role_set=role_set, menu_set=menu_set,
                        certification_type=certification_type, without_env=without_env
                ):
                    raise ParameterError("权限不足")
            return function(self, *args, **kwargs)

        return wrap

    return login_required


def check_competence(
        current_handler, operation_set: set = None, role_set: set = None, menu_set: set = None,
        certification_type: str = "competence", without_env: bool = True
):
    """
    权限检测
    :param current_handler: 当前请求视图实例
    :param operation_set: 该接口所需要的操作权限集合
    :param menu_set: 该接口所需要的菜单权限集合
    :param role_set: 该接口所需要的角色要求
    :param certification_type: 验证类型 competence:操作权限控制   role基于角色控制, menu 菜单控制
    :param without_env: 是否需要与环境关联, 默认不关联,个别操作需要判断环境
    :return:
    """
    # 菜单权限由前端实现,后端无从校验。后端主要做操作权限校验
    if certification_type == "competence":
        if not operation_set:
            return True
        request_env = None
        if not without_env:
            request_env = current_handler.args["env"]
        role_ids = get_user_role_ids(g.user_id)
        operation_keys = get_user_operation_keys(role_ids, request_env)
        return not bool(operation_set.difference(operation_keys))

    # 基于角色的权限控制
    elif certification_type == "role":
        if not role_set:
            return True
        role_keys = get_user_role_keys(g.user_id)
        return bool(role_keys & role_set)
    elif certification_type == "menu":
        if not menu_set:
            return True
        role_ids = get_user_role_ids(g.user_id)
        menu_keys = get_user_menu_keys(role_ids)
        return not bool(menu_set.difference(menu_keys))
    else:
        raise ValueError("暂不支持该类型权限认证")


def get_user_role_ids(user_id):
    query = f"select role_id from role_user where user_id=:user_id;"
    return [_id[0] for _id in TransactionResource.get_query_results(query, user_id=user_id)]


def get_user_role_keys(user_id):
    role_ids = get_user_role_ids(user_id)
    if not role_ids:
        return set()
    sql = """
        select role_key from role where role_id in :role_ids;
    """
    results = TransactionResource.get_query_results(sql, role_ids=role_ids)
    return set(r[0] for r in results)


def get_user_operation_keys(role_ids, env=None):
    """获取用户的可操作列表"""
    if not role_ids:
        return set()
    sql = f"""
        select gb_operation.operation_key from role_operation 
        inner join gb_operation on role_operation.operation_id = gb_operation.operation_id 
        where role_operation.role_id in :role_ids
    """
    if env:
        sql += " and gb_operation.env = :env;"
    results = TransactionResource.get_query_results(sql, role_ids=role_ids, env=env)
    operation_keys = {r[0] for r in results}
    return operation_keys


def get_user_menu_keys(role_ids):
    if not role_ids:
        return set()
    sql = f"""
            select gb_menu.menu_key from role_menu 
            inner join gb_menu on role_menu.menu_id = gb_menu.menu_id 
            where role_menu.role_id in :role_ids
        """
    results = TransactionResource.get_query_results(sql, role_ids=role_ids)
    menu_keys = {r[0] for r in results}
    return menu_keys


def transaction(sql, bindparams=None, commit=True, fetch_result=False, **kwargs):
    """
    @param sql: sql text
    @param bindparams: List 对kwargs中的key进行类型绑定及限制等 like :
        [bindparam('alarm_time_param', type_=DateTime, required=True)])
    @param fetch_result: 是否直接返回结果,如果返回,为列表,否则是一个result对象
    @param kwargs: sql中绑定的参数
    @return:
    """
    session = current_app.db.session
    try:
        results = session.execute(text(sql, bindparams), kwargs)
        if commit:
            session.commit()
    except Exception as e:
        current_app.logger.error(str(e))
        session.rollback()  # 出现异常则 rollback
        raise e
    if fetch_result:
        if results.rowcount:
            return [r for r in results]
        else:
            return None
    else:
        return results


def with_transaction(function):
    @wraps(function)
    def wrap(self, *args, **kwargs):
        with current_app.db.engine.connect() as conn:
            trans = conn.begin()  # 开启一个 transaction
            g.trans = trans
            g.conn = conn
            self.trans = trans
            self.conn = conn
            try:
                resp = function(self, *args, **kwargs)
                trans.commit()
                return resp
            except Exception as e:
                current_app.logger.error(str(e))
                trans.rollback()  # 出现异常则 rollback
                raise e

    return wrap


def do_nothing(value):
    return value


def check_args(**kwargs):
    def my_decorator(func):
        @wraps(func)
        def wrapper(self, *_args, **_kwargs):
            parser = reqparse.RequestParser()
            # 添加请求体或查询参数
            for key, value in kwargs.items():
                if key not in _kwargs:
                    required = False if value.startswith("?") else True
                    ignore = ~required
                    # assert ":" in value, "若字符串以?开始,代表非必须; ':'前为字段类型,后为缺失时的错误提示"
                    # 问号开头代表非必须,第零项为变量名
                    items = value.lstrip("?").split(":")
                    items_count = len(items)
                    _type_str = items[0]
                    # 第一项为错误提示语
                    if items_count >= 2 and items[1]:
                        _help = items[1]
                    else:
                        _help = f"{key}为必须参数"
                    # 第二项为参数所在位置
                    if items_count >= 3 and items[2]:
                        location = tuple(items[2].split(","))
                    else:
                        location = ('json', 'values',)
                    # 第三项为参数实际名称(请求参数的名称)
                    if items_count >= 4 and items[3]:
                        name = items[3]
                        dest = key
                    else:
                        name = key
                        dest = None
                    if _type_str == "list":
                        _type = do_nothing
                        action = "append"
                        default = lambda: []
                    else:
                        _type = eval(_type_str)
                        action = "store"
                        default = None
                    parser.add_argument(
                        name, dest=dest, type=_type, help=_help, required=required, action=action, default=default,
                        ignore=ignore, location=location
                    )
            try:
                args = parser.parse_args()
            except BadRequest as e:
                if hasattr(e, "data"):
                    values = e.data.get("message", {}).values()
                    msg = ",".join(values)
                    if not msg:
                        keys = e.data.get("message", {}).keys()
                        msg = ",".join(keys)
                    raise ParameterError(msg)
                else:
                    raise e
            # 添加url解析出来的参数
            args.update(_kwargs)
            self.args = args
            current_app.logger.debug('请求参数:{}'.format(json.dumps(args, ensure_ascii=False, indent=4)))
            result = func(self, *_args, **_kwargs)
            current_app.logger.debug(
                '响应参数:{}'.format(json.dumps(result, ensure_ascii=False, indent=4)))
            return result

        return wrapper

    return my_decorator


class QueryMixin:
    select_fields = None
    table = None

    @classmethod
    def get_query_results(cls, sql, bindparams=None, fetch_result=False, **kwargs):
        results = current_app.db.session.execute(text(sql, bindparams), kwargs)
        current_app.logger.debug(results.cursor._executed.decode())
        if fetch_result:
            if results.rowcount:
                return [r for r in results]
            else:
                return []
        return results

    @classmethod
    def get_single_field_query_result(cls, sql, bindparams=None, **kwargs):
        results = current_app.db.session.execute(text(sql, bindparams), kwargs)
        current_app.logger.debug(results.cursor._executed.decode())
        result = None
        for r in results:
            result = r[0]
            break
        return result

    @classmethod
    def get_single_field_query_results(cls, sql, bindparams=None, **kwargs):
        results = current_app.db.session.execute(text(sql, bindparams), kwargs)
        current_app.logger.debug(results.cursor._executed.decode())
        return [r[0] for r in results]

    @classmethod
    def update_or_insert(cls, sql, bindparams=None, **kwargs):
        results = current_app.db.session.execute(text(sql, bindparams), kwargs)
        cursor = results._saved_cursor or results.cursor
        if cursor:
            current_app.logger.debug(results._saved_cursor._executed.decode())
        return results

    @classmethod
    def serialize_results(cls, results, select_fields=None, is_multi=False, rename_map=None):
        if is_multi:
            return cls.serialize_multi_results(results, select_fields=select_fields, rename_map=rename_map)
        else:
            return cls.serialize_single_results(results, select_fields=select_fields, rename_map=rename_map)

    @classmethod
    def serialize_single_results(cls, results, select_fields=None, rename_map=None):
        select_fields = select_fields or cls.select_fields
        assert select_fields, "请检查需要序列化的字段"
        data = dict()
        if not rename_map:
            rename_map = dict()
        for result in results:
            for index, field in enumerate(select_fields):
                if a := rename_map.get(field):
                    field = a
                else:
                    field = field.split(".")[-1]
                temp_result = result[index]
                if isinstance(temp_result, datetime.datetime):
                    data[field] = result[index].strftime('%Y-%m-%d %H:%M:%S')
                elif isinstance(temp_result, datetime.date):
                    data[field] = result[index].strftime('%Y-%m-%d')
                else:
                    data[field] = result[index]
        return data

    @classmethod
    def serialize_multi_results(cls, results, select_fields=None, rename_map=None):
        select_fields = select_fields or cls.select_fields
        data_list = list()
        if not rename_map:
            rename_map = dict()
        for result in results:
            data = dict()
            for index, field in enumerate(select_fields):
                if a := rename_map.get(field):
                    field = a
                else:
                    field = field.split(".")[-1]
                temp_result = result[index]
                if isinstance(temp_result, datetime.datetime):
                    data[field] = result[index].strftime('%Y-%m-%d %H:%M:%S')
                elif isinstance(temp_result, datetime.date):
                    data[field] = result[index].strftime('%Y-%m-%d')
                else:
                    data[field] = result[index]
            data_list.append(data)
        return data_list

    @classmethod
    def get_query_single_result(
            cls, sql, bindparams=None, fetch_result=False, select_fields=None, serialize=True, rename_map=None, **kwargs
    ):
        if serialize:
            fetch_result = False
        results = cls.get_query_results(sql=sql, bindparams=bindparams, fetch_result=fetch_result, **kwargs)
        if serialize:
            return cls.serialize_single_results(results, select_fields=select_fields, rename_map=rename_map)
        else:
            return results

    @classmethod
    def get_last_insert_id(cls):
        results = cls.get_query_results("SELECT LAST_INSERT_ID();", fetch_result=True)
        if results:
            return results[0][0]
        else:
            return None

    @classmethod
    def group_data_list(cls, data_list: list, key) -> list:
        value_set = {d[key] for d in data_list}
        value_list = sorted(value_set)
        index_map = dict()
        results = list()
        for index, v in enumerate(value_list):
            results.append({key: v, "values": list()})
            index_map[v] = index
        for data in data_list:
            results[index_map[data[key]]]["values"].append(data)
        return results

    @classmethod
    def get_limit_sql_suffix(cls, page_size, current_page):
        return f" limit {(current_page - 1) * page_size}, {page_size};"

    @classmethod
    def build_query_sql(cls, sql_template: str, select_fields: list = None, page_size: int = 0, current_page=None):
        if current_page is None:
            current_page = 1
        select_fields = select_fields or cls.select_fields
        sql = sql_template.format(fields=",".join(select_fields))
        count_sql = sql_template.format(fields="count(1)") + ";"
        if page_size:
            sql += cls.get_limit_sql_suffix(page_size, current_page)
        else:
            sql += ";"
        return sql, count_sql

    @classmethod
    def get_page_info(cls, page_size, current_page, total_count):
        if current_page is None:
            current_page = 1
        if page_size:
            pages = math.ceil(total_count / page_size)
            return {
                "total": total_count,
                "size": page_size,
                "pages": pages,
                "current": current_page
            }
        else:
            return {
                "total": total_count,
                "size": total_count,
                "pages": 1,
                "current": 1
            }

    @classmethod
    def get_config_value_by_key(cls, config_key):
        sql = "select config_value from gb_config where config_key=:config_key;"
        config_value = cls.get_single_field_query_result(sql, config_key=config_key)
        return config_value

    @staticmethod
    def paginate(collections, page=None, per_page=None) -> tuple:
        """返回第page页的per_page个结果
        返回(items, total)
        """
        if page is None:
            page = 1
        else:
            page = int(page)
        if per_page is None:
            per_page = 20
        else:
            per_page = int(per_page)
        if (total := len(collections)) < (end := page * per_page):
            items = collections[(page - 1) * per_page:]
        else:
            items = collections[(page - 1) * per_page: end]
        return items, total


class TransactionResource(Resource, QueryMixin):

    def dispatch_request(self, *args, **kwargs):
        try:
            return super(TransactionResource, self).dispatch_request(*args, **kwargs)
        except Exception as e:
            current_app.db.session.rollback()
            current_app.logger.error(str(e))
            raise e


# 重写API类,目的是做到正确返回错误码,否则默认是500
class CustomizeApi(Api):
    def handle_error(self, e):
        """Error handler for the API transforms a raised exception into a Flask
        response, with the appropriate HTTP status code and body.

        :param e: the raised Exception object
        :type e: Exception

        """
        got_request_exception.send(current_app._get_current_object(), exception=e)

        if not isinstance(e, HTTPException) and current_app.propagate_exceptions:
            exc_type, exc_value, tb = sys.exc_info()
            if exc_value is e:
                raise
            else:
                raise e

        headers = Headers()
        if isinstance(e, HTTPException):
            code = e.code
            default_data = {
                'message': getattr(e, 'description', http_status_message(code))
            }
            headers = e.get_response().headers
        else:
            code = 500
            default_data = {
                'message': http_status_message(code),
            }

        # Werkzeug exceptions generate a content-length header which is added
        # to the response in addition to the actual content-length header
        # https://github.com/flask-restful/flask-restful/issues/534
        remove_headers = ('Content-Length',)

        for header in remove_headers:
            headers.pop(header, None)

        data = getattr(e, 'data', default_data)

        if code and code >= 500:
            exc_info = sys.exc_info()
            if exc_info[1] is None:
                exc_info = None
            current_app.log_exception(exc_info)

        error_cls_name = type(e).__name__
        if error_cls_name in self.errors:
            custom_data = self.errors.get(error_cls_name, {})
            code = custom_data.get('status', code)
            if e.description:
                custom_data["message"] = e.description
            data.update(custom_data)

        if code == 406 and self.default_mediatype is None:
            # if we are handling NotAcceptable (406), make sure that
            # make_response uses a representation we support as the
            # default mediatype (so that make_response doesn't throw
            # another NotAcceptable error).
            supported_mediatypes = list(self.representations.keys())
            fallback_mediatype = supported_mediatypes[0] if supported_mediatypes else "text/plain"
            resp = self.make_response(
                data,
                code,
                headers,
                fallback_mediatype=fallback_mediatype
            )
        else:
            resp = self.make_response(data, code, headers)

        if code == 401:
            resp = self.unauthorized(resp)
        return resp


class K8SBase:
    def __init__(self, app_name, group_name, k8s_url, k8s_token, namespace, zone_code):
        self.k8s_url = k8s_url
        self.k8s_token = k8s_token
        self.config = self.get_config()
        self.core_v1 = self.get_core_v1()
        self.apps_v1 = self.get_apps_v1()
        self.extensions_v1 = self.get_extensions_v1_beta1()
        self.namespace = namespace
        self.zone_code = zone_code
        self.app_name = app_name
        self.group_name = group_name

    def get_config(self):
        try:
            configuration = kubernetes.client.Configuration()
            configuration.host = self.k8s_url
            configuration.verify_ssl = False
            configuration.api_key = {"authorization": "Bearer " + self.k8s_token}  # 获取异常并提示用户
            client_config = kubernetes.client.api_client.ApiClient(configuration=configuration)
            return client_config
        except TypeError as e:
            current_app.logger.error(str(e))
            return

    def get_core_v1(self):
        core_v1 = kubernetes.client.apis.core_v1_api.CoreV1Api(self.config)
        return core_v1

    def get_extensions_v1_beta1(self):
        extensions_v1 = ExtensionsV1beta1Api(self.config)
        return extensions_v1

    def get_apps_v1(self):
        apps_v1 = AppsV1Api(self.config)
        return apps_v1


class K8SPodInfoTool(K8SBase):
    @classmethod
    def get_pod_restart_count(cls, pod: V1Pod):
        if pod.status.container_statuses:
            for item in pod.status.container_statuses:
                return item.restart_count
        return 0

    @classmethod
    def get_image_version(cls, pod: V1Pod):
        if pod.status.container_statuses:
            for item in pod.status.container_statuses:
                return item.image.split(":")[1] if item.image and ":" in item.image else None
        return ""

    @classmethod
    def get_pod_info(cls, pod):
        conditions = pod.status.conditions
        return {
            "pod_name": pod.metadata.name,
            "pod_ip": pod.status.pod_ip,
            "app_version": cls.get_image_version(pod),
            "phase": pod.status.phase,
            "conditions": ";".join(map(lambda x: f"{x.type}: {x.status}", conditions)) if conditions else "",
            "restart_count": cls.get_pod_restart_count(pod)
        }

    def sync_deployment_pods_info(self, match_labels):
        """
        同步正在发布中的 pod 的信息
        :return:
        """
        items = []
        for key, value in match_labels.items():
            items.append(f"{key}={value}")
        labels = ",".join(items)
        results = self.core_v1.list_namespaced_pod(namespace=self.namespace, label_selector=labels, watch=False)
        pod_list = list()
        for pod in results.items:
            pod_info = self.get_pod_info(pod)
            pod_list.append(pod_info)
        return pod_list


class K8SCDTool(K8SBase):

    def get_deploy_info(self, deploy_name):
        try:
            result = self.apps_v1.read_namespaced_deployment(deploy_name, self.namespace)
        except ApiException as e:
            current_app.logger.error('获取deployment信息失败:{}'.format(e))
            error = json.loads(e.body)
            message = error.get("message", e)
            return False, message
        else:
            return True, result

    def get_deploy_info_status(self, deploy_name):
        try:
            result = self.apps_v1.read_namespaced_deployment_status(deploy_name, self.namespace)
        except ApiException as e:
            current_app.logger.error('获取deployment信息失败:{}'.format(e))
            error = json.loads(e.body)
            message = error.get("message", e)
            return False, message
        else:
            return True, result

    def get_pod_status(self, k8s_pod_tool, match_label):
        try:
            pod_list = k8s_pod_tool.sync_deployment_pods_info(match_label)
            pods_status = {}
            for pod_info in pod_list:
                pod_name = pod_info.get('pod_name')
                result = self.core_v1.read_namespaced_pod_status(pod_name, self.namespace)
                pods_status[pod_name] = result
        except ApiException as e:
            current_app.logger.error('获取Pod信息失败:{}'.format(e))
            error = json.loads(e.body)
            message = error.get("message", e)
            return False, message
        else:
            return True, pods_status

    def get_namespace_pod(self, deploy_name):
        try:
            result = self.core_v1.read_namespaced_pod(deploy_name, self.namespace)
        except ApiException as e:
            current_app.logger.error('获取deployment信息失败:{}'.format(e))
            error = json.loads(e.body)
            message = error.get("message", e)
            return False, message
        else:
            return True, result

    def update_deploy(self, name, body):
        try:
            result = self.apps_v1.replace_namespaced_deployment(name, self.namespace, body)
            return 3, result  # 3 为发布中
        except ApiException as e:
            current_app.logger.error('更新Deployment失败:{}'.format(e))
            error = json.loads(e.body)
            message = error.get("message", "")
            if e.status == 422:
                if "MatchLabels" in message and "field is immutable" in message:
                    return 2, "检测到应用名称发生改变,需删除Deployment后重新发布"
                else:
                    return 2, message
            else:
                return 2, e  # 2 为发布失败

    def patch_deploy(self, name, patch_body):
        try:
            result = self.apps_v1.patch_namespaced_deployment(name, self.namespace, patch_body)
            return 3, result  # 3 为发布中
        except ApiException as e:
            current_app.logger.error('更新Deployment失败:{}'.format(e))
            error = json.loads(e.body)
            message = error.get("message", e)
            return 2, message  # 2 为发布失败

    def delete_deploy(self, name):
        try:
            result = self.apps_v1.delete_namespaced_deployment(name, self.namespace)
            return 3, result  # 3 为发布中
        except ApiException as e:
            current_app.logger.error('更新Deployment失败:{}'.format(e))
            error = json.loads(e.body)
            message = error.get("message", e)
            return 2, message  # 2 为发布失败

    def create_deploy(self, body):
        """创建deploy"""
        try:
            result = self.apps_v1.create_namespaced_deployment(self.namespace, body)
            return 3, result  # 3 为发布中
        except ApiException as e:
            current_app.logger.error('发布失败:{}'.format(e))
            error = json.loads(e.body)
            message = error.get("message", e)
            return 2, message  # 2 为发布失败

    def add_pod_label(self, label, pod_name_list):
        """为pod添加标签"""
        try:
            result = self.core_v1.list_pod_for_all_namespaces(label_selector=label)
            index = 0
            for pod_info in result.items:
                pod_name = pod_info.metadata.name
                pod_res = self.core_v1.read_namespaced_pod(pod_name, self.namespace)
                pod_res.metadata.labels['name'] = pod_name_list[index][0]
                self.core_v1.patch_namespaced_pod(pod_name, self.namespace, pod_res)
                index += 1
        except ApiException as e:
            current_app.logger.error('添加名称标签异常:{}'.format(e))

    def get_service_info(self, service_name):
        try:
            result = self.core_v1.read_namespaced_service(service_name, self.namespace)
        except ApiException as e:
            return 2, e
        else:
            return 1, result

    def create_service(self, body):
        """创建 service"""
        try:
            result = self.core_v1.create_namespaced_service(self.namespace, body)
            return 1, result
        except Exception as e:
            current_app.logger.error('创建 service 失败:{}'.format(e))
            return 2, e

    def update_service(self, service_name, body):
        """ 更新 service, patch为补丁方式更新,默认为merge,需要指定replace, 参照:http://jsonpatch.com/ """
        try:
            result = self.core_v1.patch_namespaced_service(service_name, self.namespace, body)
            return 1, result
        except Exception as e:
            current_app.logger.error('更新 service 失败:{}'.format(e))
            return 2, e

    def delete_service(self, service_name):
        """ 删除 service """
        try:
            result = self.core_v1.delete_namespaced_service(service_name, self.namespace)
            return 1, result
        except Exception as e:
            current_app.logger.error('删除 service 失败:{}'.format(e))
            return 2, e

    def create_ingress(self, body):
        """ 创建 ingress 规则"""
        try:
            result = self.extensions_v1.create_namespaced_ingress(self.namespace, body)
            return 1, result
        except Exception as e:
            current_app.logger.error('创建 service 失败:{}'.format(e))
            return 2, e

    def update_ingress(self, ingress_name, body):
        """ 更新 ingress 规则"""
        try:
            result = self.extensions_v1.patch_namespaced_ingress(ingress_name, self.namespace, body)
            return 1, result
        except Exception as e:
            current_app.logger.error('更新 ingress 失败:{}'.format(e))
            return 2, e

    def delete_ingress(self, ingress_name):
        """ 删除 ingress 规则"""
        try:
            result = self.extensions_v1.delete_namespaced_ingress(ingress_name, self.namespace)
            return 1, result
        except Exception as e:
            current_app.logger.error('删除 ingress 失败:{}'.format(e))
            return 2, e

    def check_service_exists(self, service_name):
        try:
            result = self.core_v1.read_namespaced_service(service_name, self.namespace)
        except ApiException as e:
            if e.status == 404:
                return False
        else:
            return result

    def check_deployment_exists(self, deployment_name):
        try:
            self.apps_v1.read_namespaced_deployment(deployment_name, self.namespace)
        except ApiException as e:
            if e.status == 404:
                return False
        else:
            return True

    def check_ingress_exists(self, ingress_name):
        try:
            self.extensions_v1.read_namespaced_ingress(ingress_name, self.namespace)
        except ApiException as e:
            if e.status == 404:
                return False
        else:
            return True

    def list_namespaced_persistent_volume_claim(self):
        try:
            result = self.core_v1.list_namespaced_persistent_volume_claim(self.namespace)
            return True, result
        except ApiException as e:
            error = json.loads(e.body)
            message = error.get("message", e)
            current_app.logger.error('获取存储声明失败:{}'.format(message))
            return False, message

    def list_namespaced_config_map(self):
        try:
            result = self.core_v1.list_namespaced_config_map(self.namespace)
            return True, result
        except ApiException as e:
            error = json.loads(e.body)
            message = error.get("message", e)
            current_app.logger.error('获取 config map 失败:{}'.format(message))
            return False, message

    def list_namespaced_secret_map(self):
        try:
            result = self.core_v1.list_namespaced_secret(self.namespace)
            return True, result
        except ApiException as e:
            error = json.loads(e.body)
            message = error.get("message", e)
            current_app.logger.error('获取 config map 失败:{}'.format(message))
            return False, message


def get_k8s_cd_tool(app_name, group_name, system_code, env, namespace, zone_code) -> K8SCDTool:
    return get_k8s_tool(app_name, group_name, system_code, env, namespace, zone_code, K8SCDTool)


def get_k8s_info_tool(app_name, group_name, system_code, env, namespace, zone_code) -> K8SPodInfoTool:
    return get_k8s_tool(app_name, group_name, system_code, env, namespace, zone_code, K8SPodInfoTool)


def get_k8s_tool(app_name, group_name, system_code, env, namespace, zone_code, k8s_class):
    url_key = f"K8S_{system_code}_{env}_{zone_code}_URL"
    try:
        k8s_url = TransactionResource.get_config_value_by_key(url_key)
        if not k8s_url:
            raise ValueError(f'{url_key} 未配置')
        token_key = f"K8S_{system_code}_{env}_{zone_code}_TOKEN"
        k8s_token = TransactionResource.get_config_value_by_key(token_key)
        cd_obj = k8s_class(app_name, group_name, k8s_url, k8s_token, namespace, zone_code)
        return cd_obj
    except ApiException as e:
        current_app.logger.error(str(e))
        raise ParameterError(f'{url_key} 未配置')


def setup_log(config_obj, app):
    # 设置日志的记录等级
    logging.basicConfig(level=config_obj.LOG_LEVEL)
    # 创建日志记录器,指明日志保存的路径、每个日志文件的最大大小、保存的日志文件个数上限
    file_log_handler = RotatingFileHandler(config_obj.LOG_FILE, maxBytes=1024 * 1024 * 100, backupCount=10)
    # 创建日志记录的格式 日志等级 输入日志信息的文件名 行数 日志信息
    formatter = logging.Formatter(
        "[%(asctime)s][%(pathname)s][%(filename)s][%(lineno)d][%(levelname)s][%(thread)d][%(module)s][%(funcName)s][%(message)s]")
    # 为刚创建的日志记录器设置日志记录格式
    file_log_handler.setFormatter(formatter)
    # 为全局的日志工具对象(flask app使用的)添加日志记录器
    logging.getLogger().addHandler(file_log_handler)
    # 设置flask自带的app.logger级别
    app.logger.setLevel(config_obj.LOG_LEVEL)


# 用户操作日志
class OperationLog(object):

    def __init__(self, user_id, operation_key, page, result, env, created_time, content
                 ):
        self.operation_key = operation_key
        self.created_time = created_time
        self.user_id = user_id
        self.page = page
        self.result = result
        self.env = env
        self.content = content

    def set_log(self, bindparams=None):
        params = {
            "operation_key": self.operation_key,
            "created_time": self.created_time,
            "user_id": self.user_id,
            "page": self.page,
            "result": self.result,
            'env': self.env,
            'content': self.content,
        }
        values_str = ','.join(map(lambda x: f':{x}', params.keys()))
        sql = f"insert into gb_operation_log ({','.join(params.keys())}) values ({values_str});"
        results = current_app.db.session.execute(text(sql, bindparams), params)
        return results


class OperationNotify(object):

    def __init__(self, operation_key, content, template_code=None, params=None):
        self.operation_key = operation_key
        self.text = content
        self.template_code = template_code
        self.params = params

    def get_notify_user_list(self):
        sql = f'select user_id from gb_notify where operation_key=:operation_key'
        results = QueryMixin.get_single_field_query_results(sql, operation_key=self.operation_key)
        return results

    def get_recipients_email(self):
        user_list = self.get_notify_user_list()
        query_email_sql = "select email from user where user_id in :user_list; "
        results_email = QueryMixin.get_single_field_query_results(query_email_sql, user_list=user_list,
                                                                  fetch_result=True)
        return results_email

    def get_recipients_phone(self):
        user_list = self.get_notify_user_list()
        query_sms_sql = "select phone from user where user_id in :user_list; "
        results_sms = QueryMixin.get_single_field_query_results(query_sms_sql, user_list=user_list, fetch_result=True)
        return results_sms

    def send_email(self):
        sql = f'select distinct is_notify_email from `gb_notify` where operation_key=:operation_key'
        is_notify = QueryMixin.get_single_field_query_result(sql, operation_key=self.operation_key)
        if not is_notify:
            return
        mail_body = self.text
        recipients = self.get_recipients_email()
        send_email("Genbu通知", recipients, mail_body)

    def send_sms(self):
        sql = f'select distinct is_notify_sms from `gb_notify` where operation_key=:operation_key'
        is_notify = QueryMixin.get_single_field_query_result(sql, operation_key=self.operation_key)
        if not is_notify:
            return
        if not self.template_code or not self.params:
            return
        recipients = self.get_recipients_phone()
        acs_client = AcsClient(current_app.config["ACCESS_KEY_ID"], current_app.config["ACCESS_KEY_SECRET"])
        for phone in recipients:
            result = send_sms(acs_client, phone, self.template_code, template_param=self.params)
            result_obj = json.loads(result)
            if result_obj['Code'] != 'OK':
                current_app.logger.error(result.decode('utf8'))
                raise ParameterError("短信发送失败")
            current_app.logger.info(result.decode('utf8'))

  

原文地址:https://www.cnblogs.com/hude/p/13743812.html