python2案例:为Linux主机设置登陆成功的邮件提醒

一、背景

我们的主机被攻击者破解后,ssh登陆,我们不能及时知道。那么开发一个程序,提示所有成功ssh登陆到主机的情况,就十分必要。

二、分析

1、采用python2实现发邮件的程序。

2、Linux设置ssh登陆成功就触发py2的程序。

三、代码实现

centos7上:

1、采用python2实现发邮件的程序。

(1)创建程序存放目录

mkdir -p /usr/local/system_script/ssh_on_email/

(2)创建python2程序

cd  /usr/local/system_script/ssh_on_email/

vim  ssh_on_notes.py

如下代码中配置要改:

V1.0-简单功能(不推荐)

# -*-coding:utf-8-*-
import os
import smtplib
from email.MIMEText import MIMEText
from email.Header import Header
from email.mime.multipart import MIMEMultipart
import base64
import socket


# base64 2 str
def base642str(pwd_encode_str):
    base64_decrypt = base64.b64decode(pwd_encode_str.encode('utf-8'))
    # pwd_decode_str = str(base64_decrypt)
    return base64_decrypt


# get self ip
def get_host_ip():
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    except Exception, e:
        ip = False
    finally:
        s.close()

    return ip


# send email
def send_mail(prepare_list, to_list, info_list, port=465):
    em_host = prepare_list[0]
    em_user = prepare_list[1]
    em_pass = prepare_list[2]

    em_head = info_list[0]
    em_content = info_list[1]
    em_attach_list = info_list[2]

    # 1.创建邮件。创建一个带附件的实例
    msg = MIMEMultipart()
    me = "andy-seeker" + "<" + em_user + ">"
    msg['From'] = me
    msg['To'] = ";".join(to_list)
    msg['Subject'] = em_head

    # no attachment
    if len(em_attach_list) == 0:
        em_content += str("There is no attachment.
")

    # exist attachment
    else:
        # 循环添加附件
        for attach_path_name in em_attach_list:
            # 判断附件路径是否有效,无效附件报错
            assert os.path.isfile(attach_path_name), 'The path of txt file is not correct'

            # 构造附件1,传送当前目录下的 test.txt 文件
            attach_name = os.path.basename(attach_path_name)
            att1 = MIMEText(open(attach_path_name, 'rb').read(), 'base64', 'utf-8')
            att1["Content-Type"] = 'application/octet-stream'

            # 中文附件(英文也适用)
            att1.add_header("Content-Disposition", "attachment", filename=("utf-8", "", attach_name))

            # 添加一个附件
            msg.attach(att1)
    # 添加邮件正文内容
    # msg.attach(MIMEText(em_content, 'html', 'gb2312')) # 邮件正文html格式
    msg.attach(MIMEText(em_content, 'plain', 'gb2312'))  # 邮件正文纯文本格式
    # 打印正文内容
    print(em_content)

    try:
        # 2.登录账号(加密传输)
        # sever = smtplib.SMTP(smtp_sever,25) # 明文传输端口号是25
        sever = smtplib.SMTP_SSL(em_host, port)  # 加密传输端口号是465
        sever.login(em_user, em_pass)
        # 3.发送邮件
        sever.sendmail(me, to_list, msg.as_string())
        sever.quit()
        return True
    except Exception as e:
        print(str(e))
        return False


if __name__ == '__main__':
    # 1 构建参数
    prepare_list = [None, None, None]
    to_list = ["xxx@qq.com"]  # 你的收件箱
    info_list = [None, None, None]
    # 1.1 构建连接邮箱的参数
    em_host = prepare_list[0] = "smtp.163.com"
    em_user = prepare_list[1] = "yyy@163.com"  # 你的发件箱
    em_pass = prepare_list[2] = ""  # 你的发件箱的密码。不是邮箱登陆密码,而是SMTP授权码。登陆你的邮箱,开启SMTP服务获取。

    # get your host ip
    ip_flag = get_host_ip()
    sender_ip = ""
    if ip_flag is False:
        sender_ip = "xxx"
    else:
        sender_ip = ip_flag
    # 1.2 构建邮件的信息
    em_head = info_list[0] = "ssh notes from ip %s" % sender_ip
    em_content = "Hi andy,
"
    em_content += "   someone has successfully logged in to your host(%s) through SSH.
 " % sender_ip
    em_content += "  If it is not your operation, please change the password in time.
"
    info_list[1] = em_content
    em_attach = info_list[2] = []

    # 2 发送邮件
    flag = send_mail(prepare_list, to_list, info_list)
    if flag:
        print("send_mail run successed")
    else:
        print("send_mail run failed")

  

V2.0-丰富功能(强烈推荐)

# -*-coding:utf-8-*-
import re
import os
import smtplib
from email.MIMEText import MIMEText
from email.Header import Header
from email.mime.multipart import MIMEMultipart
import base64
import socket
from subprocess import Popen, PIPE
import json
import sys

reload(sys)  # Python2.5 初始化后会删除 sys.setdefaultencoding 这个方法,我们需要重新载入
sys.setdefaultencoding('utf-8')


# base64 2 str
def base642str(pwd_encode_str):
    base64_decrypt = base64.b64decode(pwd_encode_str.encode('utf-8'))
    return base64_decrypt


# get self ip
def get_host_ip():
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    except Exception, e:
        ip = False
    finally:
        s.close()

    return ip


# cmd run function
def cmd(cmd_):
    child = Popen(cmd_, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
    out, err = child.communicate()
    ret = child.wait()
    return (ret, out, err)


# send email
def send_mail(prepare_list, to_list, info_list, port=465):
    em_host = prepare_list[0]
    em_user = prepare_list[1]
    em_pass = prepare_list[2]

    em_head = info_list[0]
    em_content = info_list[1]
    em_attach_list = info_list[2]

    # 1.创建邮件。创建一个带附件的实例
    msg = MIMEMultipart()
    me = "andy-seeker" + "<" + em_user + ">"
    msg['From'] = me
    msg['To'] = ";".join(to_list)
    msg['Subject'] = em_head

    # no attachment
    if len(em_attach_list) == 0:
        em_content += str("There is no attachment.
")

    # exist attachment
    else:
        # 循环添加附件
        for attach_path_name in em_attach_list:
            # 判断附件路径是否有效,无效附件报错
            assert os.path.isfile(attach_path_name), 'The path of txt file is not correct'

            # 构造附件1,传送当前目录下的 test.txt 文件
            attach_name = os.path.basename(attach_path_name)
            att1 = MIMEText(open(attach_path_name, 'rb').read(), 'base64', 'utf-8')
            att1["Content-Type"] = 'application/octet-stream'

            # 中文附件(英文也适用)
            att1.add_header("Content-Disposition", "attachment", filename=("utf-8", "", attach_name))

            # 添加一个附件
            msg.attach(att1)
    # 添加邮件正文内容
    # msg.attach(MIMEText(em_content, 'html', 'gb2312')) # 邮件正文html格式
    msg.attach(MIMEText(em_content, 'plain', 'gb2312'))  # 邮件正文纯文本格式
    # 打印正文内容
    # print(em_content)

    try:
        # 2.登录账号(加密传输)
        # sever = smtplib.SMTP(smtp_sever,25) # 明文传输端口号是25
        sever = smtplib.SMTP_SSL(em_host, port)  # 加密传输端口号是465
        sever.login(em_user, em_pass)
        # 3.发送邮件
        sever.sendmail(me, to_list, msg.as_string())
        sever.quit()
        return True
    except Exception as e:
        # print(str(e))
        return False


# ip归属地
def ip_belong(ip):
    ip_be = ""
    # ip有值
    if len(ip) >= 7:
        r = cmd("curl http://ip-api.com/json/%s?lang=zh-CN" % ip)
        if r[0] == 0:  # 请求成功
            # print(type(r[1]))
            # print(r[1])

            data_dict = json.loads(r[1])
            # print(type(data_dict))
            # print(data_dict)
            # print(data_dict["country"])
            if data_dict["status"] == "success":  # 成功查询到ip归属地
                # print(data_dict["country"])
                # print(data_dict["regionName"])
                # print(data_dict["city"])
                ip_be = "%s%s%s" % (data_dict["country"], data_dict["regionName"], data_dict["city"])
                # print(ip_be)
            else:  # 没有查到ip归属地
                ip_be = "可能是内网ip"
                # print(ip_be)
        else:  # 查询失败
            ip_be = "ip查询失败"

    # ip没有值
    else:
        # print("ip: %s 是无效值" % ip)
        ip_be = "ip: %s 是无效值" % ip
    return ip_be


if __name__ == '__main__':
    prepare_list = [None, None, None]
    to_list = ["xxx@qq.com"]
    info_list = [None, None, None]
    em_host = prepare_list[0] = "smtp.163.com"
    em_user = prepare_list[1] = "yyy@163.com"
    em_pass = prepare_list[2] = ""

    # # 查询本机的内网ip
    # ip_flag = get_host_ip()
    # sender_ip = ""
    # if ip_flag is False:
    #     sender_ip = "xxx"
    # else:
    #     sender_ip = ip_flag

    # 查询是谁登陆在线
    r = cmd("who")
    who_info = r[1]
    # print(who_info)


    ret_list = re.findall(r"(?:[0-9]{1,3}.){3}[0-9]{1,3}", who_info)  # ip_visitor=ret_list[0]
    ret_list = ret_list[-10:]  # 获取最新登陆记录
    ip_visitor = ret_list[0]
    # print(ip_visitor)
    ret_list = re.findall(r"S*", who_info)  # who_visitor=ret_list[0]
    ret_list = ret_list[-10:]  # 获取最新登陆记录
    who_visitor = ret_list[0]
    date_visit = ret_list[4]
    time_visit = ret_list[6]
    # print(ret_list[4])
    # print(ret_list[6])
    # print(who_visitor)
    # r = cmd("whoami")
    # who_visitor = r[1].replace("
", "")

    # 查询访问者ip归属地
    ip_be = ip_belong(ip_visitor)

    # 查询本机的公网ip
    r = cmd("echo $(curl -s http://txt.go.sohu.com/ip/soip)| grep -P -o -i '(d+.d+.d+.d+)'")
    sender_ip_pub = ""
    if r[0] == 0:  # 查询本机公网ip成功
        # sender_ip_pub = r[1].replace("
", "")  # 去掉换行
        sender_ip_pub = r[1].rstrip('
')  # 去掉最右边的换行
    else:  # 查询本机公网ip失败
        sender_ip_pub = "Failed to query public IP"

    # 编辑邮件信息:标题、正文、附件
    em_head = info_list[0] = "ssh notes from ip %s" % sender_ip_pub
    em_content = "Hi andy,
"
    em_content += "   someone has successfully logged in to your host(%s) through SSH.
 " % sender_ip_pub
    em_content += "  If it is not your operation, please change the password in time.

"
    detail_info = (who_visitor, ip_visitor, ip_be, date_visit, time_visit, sender_ip_pub)
    em_content += "用户 %s 用 IP:%s 来自:%s 
在 %s %s 该用户成功登陆了服务器:%s.
" % detail_info
    em_content += "
who:
%s
" % who_info
    info_list[1] = em_content
    em_attach = info_list[2] = []  # 邮件附件文件

    flag = False
    flag = send_mail(prepare_list, to_list, info_list)

    # if flag:
    #     print("send_mail run successed")
    # else:
    #     print("send_mail run failed")

  

V3.0-修复一些bug。还有一点小bug

# -*-coding:utf-8-*-
import re
import os
import smtplib

import time
from email.MIMEText import MIMEText
from email.Header import Header
from email.mime.multipart import MIMEMultipart
import base64
import socket
from subprocess import Popen, PIPE
import json
import sys

reload(sys)  # Python2.5 初始化后会删除 sys.setdefaultencoding 这个方法,我们需要重新载入
sys.setdefaultencoding('utf-8')


# base64 2 str
def base642str(pwd_encode_str):
    base64_decrypt = base64.b64decode(pwd_encode_str.encode('utf-8'))
    return base64_decrypt


# get self ip : inner ip
def get_host_ip():
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    except Exception, e:
        ip = "failed query inner ip"
    finally:
        s.close()

    return ip


# 获取本机公网ip
def get_host_ip_pub():
    r = cmd("echo $(curl -s http://txt.go.sohu.com/ip/soip)| grep -P -o -i '(d+.d+.d+.d+)'")
    if r[0] == 0:  # 查询本机公网ip成功
        # sender_ip_pub = r[1].replace("
", "")  # 去掉换行
        sender_ip_pub = r[1].rstrip('
')  # 去掉最右边的换行
    else:  # 查询本机公网ip失败
        sender_ip_pub = "Failed to query public IP"
    return sender_ip_pub


# cmd run function
def cmd(cmd_):
    child = Popen(cmd_, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
    out, err = child.communicate()
    ret = child.wait()
    return (ret, out, err)


# send email
def send_mail(prepare_list, to_list, info_list, port=465):
    em_host = prepare_list[0]
    em_user = prepare_list[1]
    em_pass = prepare_list[2]

    em_head = info_list[0]
    em_content = info_list[1]
    em_attach_list = info_list[2]

    # 1.创建邮件。创建一个带附件的实例
    msg = MIMEMultipart()
    me = "andy-seeker" + "<" + em_user + ">"
    msg['From'] = me
    msg['To'] = ";".join(to_list)
    msg['Subject'] = em_head

    # no attachment
    if len(em_attach_list) == 0:
        em_content += str("There is no attachment.
")

    # exist attachment
    else:
        # 循环添加附件
        for attach_path_name in em_attach_list:
            # 判断附件路径是否有效,无效附件报错
            assert os.path.isfile(attach_path_name), 'The path of txt file is not correct'

            # 构造附件1,传送当前目录下的 test.txt 文件
            attach_name = os.path.basename(attach_path_name)
            att1 = MIMEText(open(attach_path_name, 'rb').read(), 'base64', 'utf-8')
            att1["Content-Type"] = 'application/octet-stream'

            # 中文附件(英文也适用)
            att1.add_header("Content-Disposition", "attachment", filename=("utf-8", "", attach_name))

            # 添加一个附件
            msg.attach(att1)

    # 添加邮件正文内容
    # msg.attach(MIMEText(em_content, 'html', 'gb2312')) # 邮件正文html格式
    msg.attach(MIMEText(em_content, 'plain', 'gb2312'))  # 邮件正文纯文本格式
    # 打印正文内容
    # print(em_content)

    try:
        # 2.登录账号(加密传输)
        # sever = smtplib.SMTP(smtp_sever,25) # 明文传输端口号是25
        sever = smtplib.SMTP_SSL(em_host, port)  # 加密传输端口号是465
        sever.login(em_user, em_pass)

        # 3.发送邮件
        sever.sendmail(me, to_list, msg.as_string())
        sever.quit()
        return True
    except Exception as e:
        # print(str(e))
        return False


# ip归属地
def ip_belong(ip):
    ip_be = ""
    # ip有值
    if len(ip) >= 7:
        r = cmd("curl http://ip-api.com/json/%s?lang=zh-CN" % ip)
        if r[0] == 0:  # 请求成功
            # print(type(r[1]))
            # print(r[1])

            data_dict = json.loads(r[1])
            # print(type(data_dict))
            # print(data_dict)
            if data_dict["status"] == "success":  # 成功查询到ip归属地
                # print(data_dict["country"])
                # print(data_dict["regionName"])
                # print(data_dict["city"])
                ip_be = "%s%s%s" % (data_dict["country"], data_dict["regionName"], data_dict["city"])
                # print(ip_be)
            else:  # 没有查到ip归属地
                ip_be = "可能是内网ip"
                # print(ip_be)
        else:  # 查询失败
            ip_be = "ip查询失败"

    # ip没有值
    else:
        # print("ip: %s 是无效值" % ip)
        ip_be = "ip: %s 是无效值" % ip
    return ip_be


# 日期转时间戳
def datestr2timeint(date_str='2016-05-05 20:28:54', format='%Y-%m-%d %H:%M:%S'):
    '''
    日期字符串 转为 时间戳。精确到s,单位秒。
    输入举例说明:
    ('2016-05-05 20:28:54')
    ('2016-05-05 20:28:54','%Y-%m-%d %H:%M:%S')
    ('20160505 20:28:54','%Y%m%d %H:%M:%S')
    ('20160505 20_28_54','%Y%m%d %H_%M_%S')
    ('20160505','%Y%m%d')
    :param date_str:日期字符串
    :param format:输入日期字串的日期格式、样式
    :return:转换为int的时间戳
    '''
    # 将时间字符串转为时间戳int
    dt = date_str
    # 转换成时间数组
    timeArray = time.strptime(dt, format)
    # 转换成时间戳
    timestamp = int(time.mktime(timeArray))

    return timestamp


if __name__ == '__main__':
    # 1 构建参数
    prepare_list = [None, None, None]
    to_list = ["xxx@qq.com"]  # 你的收件箱
    info_list = [None, None, None]
    # 1.1 构建连接邮箱的参数
    em_host = prepare_list[0] = "smtp.163.com"
    em_user = prepare_list[1] = "yyy@163.com"  # 你的发件箱
    em_pass = prepare_list[2] = ""  # 你的发件箱的密码。不是邮箱登陆密码,而是SMTP授权码。登陆你的邮箱,开启SMTP服务获取。


    # 查询是谁登陆在线
    r = cmd("who")
    who_info = r[1]

    ret_list = re.findall(r"S*", who_info)  # who_visitor=ret_list[0]

    ret_list_last = ret_list[-10:]  # 获取最新登陆记录
    who_visitor = ret_list_last[0]

    # 判断一下用户名称是否正确
    who_real = cmd("whoami")[1].rstrip('
')
    # 真实用户名和最后一条who记录相同时
    if who_visitor == who_real:
        who_visitor = ret_list_last[0]
        date_visit = ret_list_last[4]
        time_visit = ret_list_last[6]
        ip_visitor = ret_list_last[8]

    # 真实用户名和最后一条who记录不一致时
    else:
        # 对所有在线用户登陆数据进行重组
        group_list = []
        for n in range(len(ret_list) / 10):
            # print(n)
            tmp_list = ret_list[10 * n:10 * (n + 1)]
            tmp_list[3] = int(tmp_list[2][-1])
            tmp_list[5] = "%s %s" % (tmp_list[4], tmp_list[6])
            tmp_list[7] = datestr2timeint(date_str=tmp_list[5], format='%Y-%m-%d %H:%M')
            # print(tmp_list)
            group_list.append(tmp_list)

        # print()
        # 选出最近登陆的用户记录(时间戳最大值)
        big_list = reduce(lambda x, y: x if x[7] >= y[7] else y, group_list)
        # print(big_list)
        who_visitor = big_list[0]
        date_visit = big_list[4]
        time_visit = big_list[6]
        ip_visitor = big_list[8]

    # 查询访问者ip归属地
    ip_be = ip_belong(ip_visitor)

    # # 查询本机的内网ip
    # sender_ip = get_host_ip()

    # 查询本机的公网ip
    sender_ip_pub = get_host_ip_pub()

    # 编辑邮件信息:标题、正文、附件
    em_head = info_list[0] = "ssh notes %s login %s" % (who_visitor, sender_ip_pub)
    em_content = "Hi andy,
"
    detail_info = (who_visitor, ip_visitor, ip_be, date_visit, time_visit, sender_ip_pub)
    em_content += u"    用户 %s 用 IP:%s 来自:%s 
在 %s %s 该用户成功登陆了服务器:%s.
" % detail_info
    em_content += "
who:
%s
" % who_info
    em_content += "   someone has successfully logged in to your host(%s) through SSH.
 " % sender_ip_pub
    em_content += "  If it is not your operation, please change the password in time.

"
    info_list[1] = em_content
    em_attach = info_list[2] = []  # 邮件附件文件

    flag = False
    flag = send_mail(prepare_list, to_list, info_list)

    # if flag:
    #     print("send_mail run successed")
    # else:
    #     print("send_mail run failed")

  

2、Linux设置ssh登陆成功就触发py2的程序。

参考:https://www.cnblogs.com/stonehe/p/10915279.html

(1)打开centos的相关配置文件

vim /etc/profile

(2)最后一行配置

/usr/bin/python2 /usr/local/system_script/ssh_on_email/ssh_on_notes.py

3、退出测试

如果不行,就单独运行

/usr/bin/python2 /usr/local/system_script/ssh_on_email/ssh_on_notes.py

这是启动邮件发送的,如果邮件发送都不成功,就是配置有问题。

成功后会收到如下邮件:

 延伸:使用linux发送邮件

https://blog.csdn.net/ouyang_peng/article/details/77363437?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-9.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-9.channel_param

原文地址:https://www.cnblogs.com/andy9468/p/13190687.html