rabbit_消费者

import pika
import json
import time
import os
import ast
import uuid
import time
import json
import hashlib

import redis
import pymysql

import logging
from logging import handlers



# 日志记录
class Logger(object):
    level_relations = {
        'debug': logging.DEBUG,
        'info': logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR,
        'crit': logging.CRITICAL
    }  # 日志级别关系映射

    def __init__(self, filename, level='info', when='D', backCount=3,
                 fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
        self.logger = logging.getLogger(filename)
        format_str = logging.Formatter(fmt)  # 设置日志格式
        self.logger.setLevel(self.level_relations.get(level))  # 设置日志级别
        sh = logging.StreamHandler()  # 往屏幕上输出
        sh.setFormatter(format_str)  # 设置屏幕上显示的格式
        th = handlers.TimedRotatingFileHandler(filename=filename, when=when, backupCount=backCount,
                                               encoding='utf-8')  # 往文件里写入#指定间隔时间自动生成文件的处理器
        # 实例化TimedRotatingFileHandler
        # interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
        # S 秒
        # M 分
        # H 小时、
        # D 天、
        # W 每星期(interval==0时代表星期一)
        # midnight 每天凌晨
        th.setFormatter(format_str)  # 设置文件里写入的格式
        self.logger.addHandler(sh)  # 把对象加到logger里
        self.logger.addHandler(th)


file = 'all'
log = Logger('clear' + os.sep + '%s.log' % file)
logger = log.logger

DEBUG = False


class ClearLog(object):
    if DEBUG:
        DATABASE = 'unionlog'
        # 本地测试
        poll = redis.ConnectionPool(host='192.168.10.10', port=7000, db=5, password='', decode_responses=True)
        conn = pymysql.connect(host='192.168.10.5', user='root',
                               password='root',
                               database=DATABASE, charset='utf8')
        cursor = conn.cursor()
    else:
        DATABASE = 'log'
        # 线上正式
        poll = redis.ConnectionPool(host='192.168.5.219', port=6379, db=5, password='', decode_responses=True)
        conn = pymysql.connect(host='', user='datacenter',
                               password='kbs11zx@',
                               database=DATABASE, charset='utf8')
        cursor = conn.cursor()
    CONN = redis.Redis(connection_pool=poll)
    REDIS_PID_HASH = "tarsier.log.clear.pid.hash"
    REDIS_PID_DELETE_HASH = "tarsier.log.delete.pid.hash"
    REDIS_PID_DELETE_LIST = "tarsier.log.delete.pid.list"
    REDIS_PID_DELETE_LIST_TEMP = "tarsier.log.delete.pid.list.temp"
    table_list = []
    table = 'tarsier_log_details'
    instance = None

    def __new__(cls, *args, **kwargs):
        if cls.instance:
            return cls.instance
        else:
            return super().__new__(cls)

    @staticmethod
    def get_table_list(table):
        ClearLog.table = table
        # 判断表是否存在
        if table in ClearLog.table_list:
            # print('表存在1')
            pass
        else:
            ClearLog.cursor.execute("SHOW TABLES")
            res = ClearLog.cursor.fetchall()
            table_temp = []
            for i in res:
                table_temp.append(i[0])
            # print(table_temp)
            ClearLog.table_list = table_temp
            if table in ClearLog.table_list:
                # print('表存在2')
                pass
            else:
                # 创建表
                sql = """create table %s like tarsier_log_details""" % (table)
                try:
                    print('创建表')
                    ClearLog.cursor.execute(sql)
                except Exception as e:
                    pass
                ClearLog.cursor.execute("SHOW TABLES")
                res = ClearLog.cursor.fetchall()
                table_temp = []
                for i in res:
                    table_temp.append(i[0])
                ClearLog.table_list = table_temp

    # 更新数据库
    @staticmethod
    def updata_db(data):
        # ##################### 表名 #####################
        table = "tarsier_log_details_%s" % ClearLog.timestamp_to_str(format="%Y%m")
        
        ClearLog.get_table_list(table)
        keys = ', '.join(data.keys())
        values = ', '.join(['%s'] * len(data))
        # 实际用的是插入语句,不过加了ON DUPLICATE KEY UPDATE(主键存在,则执行更新操作)
        sql = 'INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE'.format(table=table, keys=keys,
                                                                                             values=values)
        update = ','.join([" {key} = %s".format(key=key) for key in data])
        sql += update
        try:
            ClearLog.cursor.execute(sql, tuple(data.values()) * 2)
            print('update Successful')
            ClearLog.conn.commit()
        except Exception as e:
            logger.error(e)
            print('update Failed')

    @staticmethod
    def update_db_sql(sql):
        try:
            ClearLog.cursor.execute(sql)  # 执行sql
            ClearLog.conn.commit()  # 提交到数据库
            print('更新成功')
        except Exception as e:
            print("ERROR:{}".format(str(e)))
            ClearLog.conn.rollback()  # 发生错误则回滚
            logger.info('error:%s' % str(e))

    def __call__(self, *args, **kwargs):
        pass

    def __init__(self):
        # ClearLog.main()
        pass

    @staticmethod
    def md5_me(key):
        md5 = hashlib.md5()
        md5.update(str(key).encode('utf-8'))
        value = md5.hexdigest()
        return value

    @staticmethod
    def main():
        with open('20201110.log', encoding='utf-8') as f:
            count = 0
            for item in f:
                line = item.strip()
                data = ast.literal_eval(line)
                # 数据清洗 - 开始
                ClearLog.clear_log(data)
                # print(ClearLog.__dict__)
                count += 1
                if count % 10000 == 0:
                    print(count)
                    # break

    @staticmethod
    def main2(data):
        # 数据清洗 - 开始
        ClearLog.clear_log(data)

    @staticmethod
    def clear_log(data):
        res_data = {}
        rsUid = data.get('rsUid', '')
        rsPageId = data.get('rsPageId', '')
        rshyuid = data.get('rshyuid', '')
        pageid = data.get('pageid', '')
        pageUrl = data.get('pageUrl', '')
        userAgent = data.get('userAgent', '')
        referrer = data.get('referrer', '')
        nowDate = data.get('nowDate', '')
        device = data.get('device', '')
        rshyType = data.get('rshyType', '')
        targetDataset = str(data.get('targetDataset', ''))
        targetValue = data.get('targetValue', '')
        targetClassName = data.get('targetClassName', '')
        inputData = str(data.get('inputData', ''))
        rshyUserIp = data.get('rshyUserIp', '')
        netloc = data.get('netloc', '')
        urlPath = data.get('urlPath', '')
        siteName = data.get('siteName', '')
        TIME = ClearLog.timestamp_to_str()
        ID = ClearLog.get_uuid()
        rshyTime = data.get('rshyTime', '')
        try:
            rsdate = rshyTime.split()[0]
            temp = rshyTime.split()[1]
            rshour = temp.split(':')[0]
        except:
            rsdate = ''
            rshour = 0
        res_data.update({
            "id": ID,
            "rsuid": rsUid,
            "rshytime": rshyTime,
            "rshour": rshour,
            "rsdate": rsdate,
            "rspageid": rsPageId,
            "rshyuid": rshyuid,
            "pageid": pageid,
            "pageurl": pageUrl,
            "useragent": userAgent,
            "referrer": referrer,
            "device": device,
            "rshytype": rshyType,
            "targetvalue": targetValue,
            "targetdataset": targetDataset,
            "targetclassname": targetClassName,
            "inputdata": inputData,
            "starttime": nowDate,
            "rshyuserip": rshyUserIp,
            "netloc": netloc,
            "urlpath": urlPath,
            "sitename": siteName,
            "createtime": TIME,
            "updatetime": TIME,
        })
        if rshyType == 'view' or rshyType == '':
            # 先判断这个值是否与存储一样
            rsUidKey = rsPageId  # ClearLog.md5_me(pageid)
            # print("pid", rsUidKey)
            if not rsPageId:
                return

            # 一直刷新pid
            ClearLog.CONN.hset(ClearLog.REDIS_PID_DELETE_HASH, rsUidKey, nowDate)

            res_temp = rsUid + pageUrl + referrer + userAgent + device
            # print('##############')
            res_rs_uid = ClearLog.md5_me(res_temp)
            # print(res_rs_uid)
            # 从redis中获取uid对应数据, 如果数据一样不做存储
            exist_uid = ClearLog.CONN.hget(ClearLog.REDIS_PID_HASH, rsUidKey)
            # print(exist_uid)
            if not exist_uid or res_rs_uid != str(exist_uid):
                ClearLog.CONN.hset(ClearLog.REDIS_PID_HASH, rsUidKey, res_rs_uid)
                # 数据入库
                ClearLog.write_data(res_data)
                # 存储一份记录时间hash
                ClearLog.CONN.hset(ClearLog.REDIS_PID_DELETE_HASH, rsUidKey, nowDate)
                # 并将此数据入删除队列
                data_temp = {"pid": rsUidKey, "date": nowDate}
                ClearLog.CONN.lpush(ClearLog.REDIS_PID_DELETE_LIST, json.dumps(data_temp))
            return
        # if not rshyType:
        #     return
        ClearLog.write_data(res_data)

    @staticmethod
    def write_data(data):
        ClearLog.updata_db(data)
        file_name = ClearLog.timestamp_to_str_m()
        with open('clear{}{}.clear.log'.format(os.sep, file_name), 'a+', encoding='utf-8') as f:
            f.write(str(data) + '
')

    # 格式化时间转时间戳
    @staticmethod
    def str_to_timestamp(str_time=None, format='%Y-%m-%d %H:%M:%S'):
        if str_time:
            time_tuple = time.strptime(str_time, format)  # 把格式化好的时间转换成元祖
            result = time.mktime(time_tuple)  # 把时间元祖转换成时间戳
            return int(result)
        return int(time.time())

    # 把时间戳转换成格式化
    @staticmethod
    def timestamp_to_str(timestamp=None, format='%Y-%m-%d %H:%M:%S'):
        if timestamp:
            time_tuple = time.localtime(timestamp)  # 把时间戳转换成时间元祖
            result = time.strftime(format, time_tuple)  # 把时间元祖转换成格式化好的时间
            return result
        else:
            return time.strftime(format)

    # 把时间戳转换成格式化
    @staticmethod
    def timestamp_to_str_m(timestamp=None, format='%Y-%m-%d'):
        if timestamp:
            time_tuple = time.localtime(timestamp)  # 把时间戳转换成时间元祖
            result = time.strftime(format, time_tuple)  # 把时间元祖转换成格式化好的时间
            return result
        else:
            return time.strftime(format)

    # uuid
    @staticmethod
    def get_uuid():
        res = str(uuid.uuid4())
        UUID = ''.join(res.split('-'))
        return UUID

    # 每5分钟删除一次hash中的值,并将停留时间算出
    @staticmethod
    def del_tarsier_log_pid_hash():
        table = ClearLog.table + '_%s' % ClearLog.timestamp_to_str_m(format='%Y%m')

        print('每5分钟删除一次hash中的值,并将停留时间算出')
        get_pid_list = ClearLog.CONN.hgetall(ClearLog.REDIS_PID_DELETE_HASH)
        # print(get_pid_list)
        for hash_pid_item in get_pid_list:

            redisDate = ClearLog.CONN.hget(ClearLog.REDIS_PID_DELETE_HASH, hash_pid_item)
            # 如果存储时间与当前时间相差1min之外更新  最后访问时间与停留时间 并将hash的值进行删除  否则不做处理
            try:
                redis_data_time = ClearLog.str_to_timestamp(redisDate)
                now_data_time = time.time()
                chufatime = now_data_time - redis_data_time
                # starttime =
                # staytime = redis_data_time - starttime
                if chufatime >= 60:
                    # 进行更新操作
                    sql = """update {} set endtime='{}' where rspageid='{}'""".format(table, redisDate,
                                                                                      hash_pid_item)
                    print(sql)
                    ClearLog.update_db_sql(sql)
                    # 更新完进行redis的值删除操作
                    ClearLog.CONN.hdel(ClearLog.REDIS_PID_DELETE_HASH, hash_pid_item)
            except Exception as e:
                pass
        print('====================================')

    # 每一天清除一次队列信息
    @staticmethod
    def del_tarsier_log_pid_list():
        logger.info('每一天清除一次队列信息')
        res_str = ClearLog.CONN.lpop(ClearLog.REDIS_PID_DELETE_LIST)
        while res_str:
            try:
                # 并将此数据入删除队列
                # data_temp = {"pid": rsUidKey, "date": nowDate}
                res_json = json.loads(res_str)
                # print(res_json)
                nowDate = res_json.get("date", '')
                rsUidKey = res_json.get("pid", '')
                redis_data_time = ClearLog.str_to_timestamp(nowDate)
                now_data_time = time.time()
                chufatime = now_data_time - redis_data_time
                if chufatime >= 24 * 60 * 60:
                    # 更新完进行redis的值删除操作
                    ClearLog.CONN.hdel(ClearLog.REDIS_PID_HASH, rsUidKey)
                    # print('删除')
                else:
                    ClearLog.CONN.rpush(ClearLog.REDIS_PID_DELETE_LIST_TEMP, json.dumps(res_json))
                res_str = ClearLog.CONN.lpop(ClearLog.REDIS_PID_DELETE_LIST)
            except:
                pass
        # print('处理队列')
        res_str = ClearLog.CONN.lpop(ClearLog.REDIS_PID_DELETE_LIST_TEMP)
        while res_str:
            res_json = json.loads(res_str)
            ClearLog.CONN.rpush(ClearLog.REDIS_PID_DELETE_LIST, json.dumps(res_json))
            res_str = ClearLog.CONN.lpop(ClearLog.REDIS_PID_DELETE_LIST_TEMP)
        logger.info('清除完毕')

# 把时间戳转换成格式化
# 天
def timestamp_to_str_day(timestamp=None, format='%Y%m%d'):
    if timestamp:
        time_tuple = time.localtime(timestamp)  # 把时间戳转换成时间元祖
        result = time.strftime(format, time_tuple)  # 把时间元祖转换成格式化好的时间
        return result
    else:
        return time.strftime(format)


# Connect to RabbitMQ and create channel
rabbit_host = "192.168.2.129"
rabbit_username = 'rshy'
rabbit_password = 'root1234@AWJSW'
queue_topic = 'logs.collect.statistics'
user = pika.PlainCredentials(rabbit_username, rabbit_password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbit_host, credentials=user,)) #  heartbeat=0
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.2.129'))
channel = connection.channel()

# Declare and listen queue
# channel.queue_declare(queue=cfg.QUEUE_TOPIC)
channel.queue_declare(queue=queue_topic)

ClearLogObj = ClearLog()
def consumer():
    
    print(' [*] Waiting for messages. To exit press CTRL+C')

    # Function process and print data
    def callback(ch, method, properties, body):
        # print("Method: {}".format(method))
        # print("Properties: {}".format(properties))

        data = json.loads(body)
        # print("ID: {}".format(data['id']))
        # print("Name: {}".format(data['name']))
        # print('Description: {}'.format(data['description']))
        print("--data--:", data)
        ClearLogObj.main2(data)
        file_name = timestamp_to_str_day()
        with open('consumer' + os.sep + file_name + '.log', 'a+', encoding='utf-8') as f:
            f.write(str(data) + '
')


    # Listen and receive data from queue
    # channel.basic_consume(cfg.QUEUE_TOPIC, callback, True)
    channel.basic_consume(queue_topic, callback, True)
    channel.start_consuming()


if __name__ == '__main__':
    consumer()
原文地址:https://www.cnblogs.com/xiao-xue-di/p/14074463.html