faker smtp server

  1 import os
  2 import asyncio
  3 import logging
  4 import base64
  5 from email import message_from_bytes
  6 from email.message import Message
  7 from datetime import datetime
  8 
  9 import aiosmtpd
 10 from aiosmtpd.controller import Controller
 11 from aiosmtpd.smtp import SMTP as Server, syntax
 12 from jinja2 import Template
 13 
 14 mail_path = "mails"
 15 hostname = "0.0.0.0"
 16 port = 8025
 17 
 18 html = """
 19 <!DOCTYPE html>
 20 <html lang="en">
 21 <head>
 22     <meta charset="UTF-8">
 23     <title>email</title>
 24 </head>
 25 <body>
 26 <div><span>发件人: </span><span>{{ from_addr|e }}</span></div>
 27 <div><span>收件人: </span><span>{{ to_addr|e }}</span></div>
 28 <div><span>主题: </span><span>{{ subject }}</span></div>
 29 <div>
 30     {{ payload }}
 31 </div>
 32 </body>
 33 </html>
 34 """
 35 
 36 
 37 class ExampleHandler:
 38     async def handle_RCPT(self, server, session, envelope, address, rcpt_options):
 39         envelope.rcpt_tos.append(address)
 40         return "250 OK"
 41 
 42     async def handle_DATA(self, server, session, envelope: aiosmtpd.smtp.Envelope):
 43         message: Message = message_from_bytes(envelope.content)
 44         message_info = await self.parse_message(message)
 45         template = Template(html)
 46         if not os.path.exists(mail_path):
 47             os.makedirs(mail_path)
 48         with open(os.path.join(mail_path, f"mail_{datetime.now().strftime('%Y-%m-%d-%H_%M_%S_%f')[:-3]}.html"), "w") as f:
 49             f.write(template.render(message_info))
 50         return "250 Message accepted for delivery"
 51 
 52     def get(self, message, item):
 53         value = message.get(item)
 54         try:
 55             value = self.to_true_str(value)
 56         except Exception:
 57             pass
 58         return value
 59 
 60     async def parse_message(self, message: Message):
 61         self.charset = message.get_content_charset() or "utf-8"
 62         payload = message.get_payload()
 63         subject = self.get(message, "Subject")
 64         from_addr = self.get(message, "From")
 65         to_addr = self.get(message, "To")
 66         try:
 67             if isinstance(payload, (list, tuple)):
 68                 payload = self.parse_payload(payload)
 69         except Exception:
 70             pass
 71         return {"subject": subject, "payload": payload, "from_addr": from_addr, "to_addr": to_addr}
 72 
 73     def parse_payload(self, payload):
 74         # todo 暂时不处理附件的问题,目前仅处理 text/html 与 text/plain 共存的情况
 75         data = None
 76         for item in payload:
 77             if isinstance(item, Message):
 78                 data = item.get_payload()
 79                 if item.get_content_type == "text/html":
 80                     break
 81 
 82         try:
 83             # 测试发现 html 有概率是转 base64
 84             data = self.to_true_str(data)
 85         except Exception:
 86             pass
 87 
 88         return data
 89 
 90     def to_true_str(self, raw: str, charset=None):
 91         if raw.startswith("=?"):
 92             tmp_list = raw.split("?")
 93             if len(tmp_list) > 2:
 94                 raw = tmp_list[-2]
 95             charset = tmp_list[1]
 96         else:
 97             charset = self.charset
 98         return base64.b64decode(raw).decode(charset)
 99 
100     async def handle_EHLO(self, *args, **kwargs):
101         return """
102 250-mail
103 250-PIPELINING
104 250-AUTH LOGIN PLAIN
105 250-AUTH=LOGIN PLAIN
106 250-coremail
107 250-STARTTLS
108 250-SMTPUTF8
109 250 8BITMIME"""
110 
111 
112 class MyServer(Server):
113 
114     @syntax("AUTH PLAIN")
115     @asyncio.coroutine
116     def smtp_AUTH(self, PLAIN, *args, **kwargs):
117         yield from self.push("235 auth successfully")
118 
119     @syntax("EHLO hostname")
120     async def smtp_EHLO(self, hostname):
121         status = await self._call_handler_hook("EHLO", hostname)
122         self.session.host_name = hostname
123         await self.push(status)
124 
125 
126 class MyController(Controller):
127     def factory(self):
128         return MyServer(self.handler)
129 
130 
131 async def amain(loop):
132     controller = MyController(ExampleHandler(), hostname=hostname, port=port)
133     controller.start()
134 
135 
136 if __name__ == "__main__":
137     logging.basicConfig(level=logging.ERROR)
138     loop = asyncio.get_event_loop()
139     loop.create_task(amain(loop=loop))
140     try:
141         loop.run_forever()
142     except KeyboardInterrupt:
143         pass
原文地址:https://www.cnblogs.com/twotigers/p/10528736.html