python实用脚本-定时导出数据库中的数据并且发送数据到邮箱

1、发送邮件脚本

#coding=utf-8
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from email.utils import formataddr
from email.utils import parseaddr, formataddr
from email.mime.multipart import MIMEMultipart



mail_list = {
    'sender':'test@qq.com', #发件人
    'receivers':'****@qq.com', #收件人
    'title':'*****', #邮件主题
    'content':'*****', #邮件内容
    'filename':'',#附件内容
    'server':'localhost' #发送邮件服务器。默认为本机
}


def _format_addr(s):
    name, addr = parseaddr(s)
    return formataddr((Header(name, 'utf-8').encode(), addr))  #

def sendmail(**kwargs):
    #sender,receivers,title,content,filename,server = 'localhost'
    '''
    :param sender:  发件人 my_sender = 'test1<test1@yonyou.com>,test<test@yonyou.com>'
    :param receivers:  收件人
    :param title:  邮件标题
    :param content: 邮件内容
    :param attfilename: 附件名称
    :param server: 服务器的名字默认为localhost,本机
    :return:
    '''
    message = MIMEMultipart()
    message['From'] = _format_addr(kwargs['sender'])
    message['Subject'] = Header(kwargs['title'], 'utf-8')
#    message['To'] = _format_addr(kwargs['receivers']) #收件人只显示自己
#    print(kwargs['receivers'])
    message['To'] = kwargs['receivers']# 可以看得见多个收件人


    #
    message.attach(MIMEText(kwargs['content'], 'plain', 'utf-8'))
    att1 = MIMEText(open(kwargs['attfilename'], 'rb').read(), 'base64', 'utf-8')
    att1["Content-Type"] = 'application/octet-stream'
    att1["Content-Disposition"] = 'attachment; filename=%s'%(kwargs['attfilename'])
    message.attach(att1)

    try:
        smtpObj = smtplib.SMTP(kwargs['server'])
        smtpObj.sendmail(kwargs['sender'], kwargs['receivers'].split(','), message.as_string())
        print(kwargs['receivers'].split(','))
    except smtplib.SMTPException:
        print("Error: 无法发送邮件")


#sendmail(**mail_list)

2、导出excle脚本

# coding:utf8
import sys
import xlwt
import mysql.connector
import datetime
import send_mail
import os
import time

nowtime = datetime.datetime.now().strftime('%Y%m%d')
oddir="/app/crontab/python"
filename = "%s/test%s.xls"%(oddir,nowtime)
file_count = 0
end_date=time.strftime("%Y-%m-%d", time.localtime())

sql='''select*  from   test  t where   t.create_time > '2019-07-01 00:00:00' and t.create_time <%s; 
'''



config = {
    'user': '*****',
    'password': '*****',
    'host': '********',
    'port': '3306',
    'database': '********',
    'charset': 'utf8'
}


mail_list = {
    'sender':'test@PH-LOTH-V159.localdomain', #发件人
    'receivers':'test@yonyou.com', #收件人
    'title':'注册统计', #邮件主题
    'content':'统计数据', #邮件内容
    'attfilename':'',#附件内容
    'server':'localhost' #发送邮件服务器。默认为本机
}



def get_conn():
    conn = mysql.connector.connect(**config)
    return conn


def query_all(cur, sql, args):
    '''
    #参数说明
    :param cur:
    :param sql: 执行的sql
    :param args: sql中的桉树
    :return:
    '''
    print(sql)
    print(args)
    cur.execute(sql, args)
    return cur.fetchall()


def export_excle(filename, sql, outputpath):
    '''
    # 参数说明
    :param filename: 文件名称
    :param sql: 执行脚本
    :param outputpath: 输出路径
    :return:
    '''
    file_count = 0
    try:
        conn = get_conn()
        cur = conn.cursor()

        # 查询结果
        results = query_all(cur=cur, sql=sql, args=[end_date])
        # 获取MYSQL里面的数据字段名称
        fields = cur.description

        # 获取excle对象
        workbook = xlwt.Workbook()
        sheet = workbook.add_sheet('sheet1', cell_overwrite_ok=True)
        # not isinstance(test[2][0],str)

        # 写入表头信息
        for line in range(0, len(fields)):
            if isinstance(fields[line][0], (bytes, bytearray)):
                sheet.write(0, line, u'%s' % (fields[line][0]).decode())
            else:
                sheet.write(0, line, u'%s' % (fields[line][0]))

        # 写入数据信息
        row = 1  # 第二行开始
        col = 0
        file_count =len(results)
        for row in range(1, len(results)+1):
            for col in range(0, len(fields)):
                if isinstance(results[row - 1][col], (bytes, bytearray)):
                    sheet.write(row, col, u'%s' % results[row - 1][col].decode())
                else:
                    sheet.write(row, col, u'%s' % results[row - 1][col])
        #workbook.save("%s/%s.xls"%(oddir,filename) )  #filename
        workbook.save(filename)

    finally:
        if conn:
            conn.close()
    return file_count


# 结果测试
if __name__ == "__main__":
   # sql = input("请输入要查询的sql:")
    #filename = input("请输入excle的名字")
    file_count =  export_excle(filename, sql, oddir)  #执行导出结果
    if os.path.exists(filename):
        if file_count >=1:
           print("邮件已经发送 %s"%(mail_list['attfilename']))
           mail_list['attfilename'] = filename
           print("邮件已经发送 %s"%(mail_list['attfilename']))
           send_mail.sendmail(**mail_list)
        else :
           print("文件大小为空,请检查")
    else :
        print("文件不存在,请联系管理员检查脚本是否有异常")
原文地址:https://www.cnblogs.com/xiajq/p/11214569.html