数据中台-监测脚本

import datetime
import threading
from threading import Thread,Lock
from time import sleep
options.connect_timeout=60

# 表名获取
def check_list(prefixName,sysName,projectName):
    table_list_in=[]
    for table in o.list_tables(project=projectName,prefix=prefixName):       
        table_list_in.append([table.name,table.size,projectName,sysName])
    return table_list_in
# 单表查询
def check_sql(table_name,table_size,projectName,sysName):
    searchtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    if table_size!=0:                      
            odsSql='select count(*) as cnt from '+projectName+'.'+table_name +' where ds='+datadate
            # print(odsSql+"
")
            with o.execute_sql(odsSql).open_reader() as reader:
                for record in reader:
                    cnt=record._values[0]                   
                    Myrecord=[sysName,table_name,cnt,datadate,searchtime,table_size]
                    # print(Myrecord)
                    records.append(Myrecord)
    else:
        blank_record=[sysName,table_name,0,datadate,searchtime,0,0]
        # print(blank_record)
        records.append(blank_record)
    # print('==查询完成==')
# 线程管理   
def check_thread(table_list):
    print("线程管理开始")
    for nn in table_list:
        print(nn)
    # print('============')
    # print(table_list)
    threads = []
    for table in table_list:
        # print(table[0])
        t = threading.Thread(target=check_sql,args=(table[0],table[1],table[2],table[3]))
        t.setDaemon(True)
        t.start()
        sleep(0.01)
        threads.append(t)
    # print("线程共:"+ str(len(threads))+"
")

    while True:
        thread_num = len(threading.enumerate())
        # print("剩余线程数:"+ str(thread_num)+"
")
        if thread_num <= 1:
            break
        sleep(10)
# 程序入口
if __name__ == '__main__':
    print("程序开始")
    # 保存表名集合
    table_list = []
    Mylist=[['dis_uep_bs_','95598业务支持系统','PRO_DWD_EXCHANGE_ZB_DOWN'],
            ['dis_uep_icms_cisd','基建管理系统','PRO_DWD_EXCHANGE_ZB_DOWN'],
            ['dis_uep_prs_uap','统一项目储备库管理系统','PRO_DWD_EXCHANGE_ZB_DOWN'],
            ['dis_uep_uvm_tycldb','统一车辆管理平台','PRO_DWD_EXCHANGE_ZB_DOWN'],
            ['dis_uep_ecp1_ebiz_bidpro','电子商务平台','PRO_DWD_EXCHANGE_ZB_DOWN'],
            ['dis_uep_elms_htgl','经济法律管理业务应用系统','PRO_DWD_EXCHANGE_ZB_DOWN'],
            ['dis_uep_fcm_cd_cwgk','集中部署财务管控','PRO_DWD_EXCHANGE_ZB_DOWN'],
            ['dis_uep_hrcs_','人力资源管理系统','PRO_DWD_EXCHANGE_ZB_DOWN'],
            ['dis_uep_mdm_sgmdm','主数据管理系统','PRO_DWD_EXCHANGE_ZB_DOWN'],
            ['dis_uep_erp_cd_sgp_sapsr3','集中部署ERP','PRO_DWD_EXCHANGE_ZB_DOWN'],
            ['dis_uep_pis_pps_','规划计划信息管理系统','PRO_DWD_EXCHANGE_ZB_DOWN']]
    # 获取业务时间
    datadate=str(${bdp.system.bizdate})
    # 获取表名
    before_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    # print("获取表名开始时间"+before_time)
    for i in range(len(Mylist)):
        # print(i)
        table_list+=check_list(Mylist[i][0],Mylist[i][1],Mylist[i][2])
        # print(len(table_list))
    after_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    # print("获取表名结束时间"+after_time)
    # 保存查询结果集合
    records=[]  
    # 线程规划
    # 线程数
    magicNum=800
    # 表名集合长度
    lens=len(table_list)
    for i in range(lens/magicNum+1):
        print("第"+ str(i+1) +"次循环")
        m=i*magicNum
        j = m + magicNum    
        if j < lens:
            check_thread(table_list[m:j])  
        else:
            check_thread(table_list[m:])
    if len(records)!=0: 
        # print(records)
        print(len(records))
        print('开始将结果写入数据表')
        o.write_table('bdc_jishu_ads_pro.check_result_dis_all_thread',records,partition='ds='+datadate,create_partition=True)
        print('写入结束!!!')

  

We only live once, and time just goes by.
原文地址:https://www.cnblogs.com/jycjy/p/15183175.html