自定义 异步 IO 非阻塞框架

框架一

自定义Web异步非阻塞框架

suosuo.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-#
# __name__   = Web_Framework.server_async.py 
# __date__   = 2019/9/3  8:21
# __author__ ='Shi Wei'
# __description__: 
# --------------------------------------------------------------
# 服务端
import socket
import select
import re
import time

Buffer_size = 1024
backlog = 5

class HttpRequest:
    def __init__(self, conn):
        self.socket = conn

        self.header_bytes = b''
        self.header_dict = {}
        self.body_bytes = b''

        self.method  = ''
        self.url = ""
        self.protrol = ''

        self.initialize()

    def header_str(self):
        return str(self.header_bytes, encoding="utf-8")

    def initialize(self):
        self.recv_data = bytes()

        while True:
            try:
                data = self.socket.recv(Buffer_size)
            except Exception as e:
                data = None
            if not data:
                break
            else:
                self.recv_data += data
        recv = self.recv_data.split(b'

', 1)
        if len(recv) == 1: # 格式 不正确
            self.body_bytes = self.recv_data
        else:              # 格式正确
            self.header_bytes, self.body_bytes = recv
            self.initialize_header()

    def initialize_header(self):
        header_list = self.header_str().split("
")
        gen = header_list[0].split(" ")
        if len(gen) == 3:
            self.method, self.url, self.protrol = gen
            for header in header_list:
                hea = header.split(":", 1)
                if len(hea) == 2:
                    k, v = hea
                    self.header_dict[k] = v

class HttpResponse:
    def __init__(self, contennt):
        self.content = contennt

        self.headers = {}
        self.cookies = {}
    def response(self):
        return bytes(self.content, "utf-8")

class HttpNotFound(HttpResponse):
    def __init__(self):
        super(HttpNotFound, self).__init__("404 Not Found!......")

class Future:
    def __init__(self, callback):
        self._ready = False
        self.callback = callback
        self.value = None

    def set_result(self, value=None):
        self.value = value
        self._ready = True

    @property
    def ready(self):
        return self._ready

class TimeoutFuture(Future):
    def __init__(self, timeout, callback = None):
        super(TimeoutFuture, self).__init__(callback=callback)
        self.timeout = timeout
        self.start_time = time.time()

    @property
    def ready(self):
        current_time = time.time()
        if current_time > self.start_time + self.timeout:
            self._ready = True
        return self._ready

class Panda:
    def __init__(self, routers):
        self.routers = routers
        self.shiwei = []
        self.async_request_handler = {}
        self.request = None

    def run(self, host="127.0.0.1", port=9999):
        sk = socket.socket()  #(2, 1)
        sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  #(65535, 4) ###
        sk.bind((host, port))
        sk.setblocking(False)
        sk.listen(backlog)
        self.shiwei.append(sk)
        try:
            while True:
                rlist, wlist, elist = select.select(self.shiwei, [], [], 0.05)
                for r in rlist:
                    if r == sk:
                        conn, addr = sk.accept()
                        conn.setblocking(False)
                        self.shiwei.append(conn)
                    else:
                        res = self.process(r)
                        if isinstance(res, HttpResponse):
                            data = res.response()
                            r.sendall(data)
                            self.shiwei.remove(r)
                            r.close()
                        else:
                            self.async_request_handler[r] = res

                self.polling_callback() # 处理 异步请求
        except Exception as e:
            print("server_socket------->Exception msg=", e)
        finally:
            sk.close()

    def polling_callback(self):
        del_conn = []
        for conn in self.async_request_handler.keys():
            future = self.async_request_handler[conn]
            if not future.ready:
                continue
            if future.callback:
                res = future.callback(self.request, future)
                conn.sendall(res.response())
            else:
                res = """HTTP/1.1 200 OK
Date: Tue, 03 Sep 2019 01:44:42 GMT

<html><head><meta charset="UTF-8"></head><body><h1 align="center" style="margin-top: 200px;">shiwei is one Future....<br>callback = None!...</h1></body></html>"""
                conn.sendall(bytes(res, encoding="utf-8"))
            self.shiwei.remove(conn)
            del_conn.append(conn)
            conn.close()
        for conn in del_conn:
            del self.async_request_handler[conn]
        del_conn.clear()

    def process(self, conn):
        """ 处理路由系统 和 执行函数"""
        self.request = HttpRequest(conn)
        result = HttpNotFound()
        for route in self.routers:
            if re.match(route[0], self.request.url):
                func = route[1]
                if  func:
                    response = func(self.request)
                    result =  response
                break
        return result




使用 suosuo 框架 开发 一个小网站

#!/usr/bin/env python
# -*- coding: utf-8 -*-#
# __name__   = Web_Framework.start01.py 
# __date__   = 2019/9/3  9:55
# __author__ ='Shi Wei'
# __description__: 
# --------------------------------------------------------------
from server_async import Panda

from server_async import HttpResponse
from server_async import TimeoutFuture

def shi(request):
    try:
        print("33[1;31m url=%s--->header_dict=%s33[0m"%(request.url, request.header_dict))
    except Exception as e:
        pass
    return HttpResponse("the King  of Northern!....")

def www(request, future):
    return HttpResponse("is future callback func ........")

def wei(request):
    try:
        print("33[1;31m url=%s--->header_dict=%s33[0m"%(request.url, request.header_dict))
    except Exception as e:
        pass
    return TimeoutFuture(5, callback=www)
    # return HttpResponse("is wei func ")

routers = [
    (r"/shi", shi),
    (r'/wei', wei),
]

asy = Panda(routers)
asy.run(port = 9999)






框架二

异步非阻塞 server 端

#!/usr/bin/env python
# -*- coding: utf-8 -*-#
# __name__   = Web_Framework.server_suosuo.py
# __date__   = 2019/8/28  19:10
# __author__ ='Shi Wei'
# __description__: 
# --------------------------------------------------------------

"""   异步 非 阻塞  Server  端  """

import socket
import select
import time

Buffer_size = 1024
class HttpRequest:
    def __init__(self, conn):
        self.conn = conn
        self.recv_data = b''
        self.header_bytes = bytes()
        self.header_dict = {}
        self.body_bytes = bytes()

        self.method = ""
        self.url = ''
        self.protocol = ""

        self.initialize()             # 接受 数据

    def initialize(self):
        while 1:
            try:
                data = self.conn.recv(Buffer_size)
            except Exception as e:
                data = None
            if not data:
                break
            else:
                self.recv_data += data
                continue
        temp = self.recv_data.split(b'

', 1)
        if len(temp) == 1:   # 代表 是 坏的 client , 格式不正确
            self.body_bytes = self.recv_data
        else:                # 代表 是 好的 client , 格式 正确
            header, body = temp
            self.header_bytes += header
            self.body_bytes += body
            self.initialize_headers()   # 切割 请求头 数据

    @property
    def header_str(self):
        return str(self.header_bytes, encoding="utf-8")

    def initialize_headers(self):
        header_list = self.header_str.split('
')
        first_header = header_list[0].split(' ')
        if len(first_header) ==  3:
            self.method, self.url, self.protocol = first_header
            for header in header_list:
                hea = header.split(':')
                if len(hea) == 2:
                    k, v = hea
                    self.header_dict[k] = v

def Index(request):
    return "is Index Page"
    # return "HTTP/1.1 200 OK
Date:Fri, 22 May 2009 06:07:21 GMT
Content-Type: text/html; charset=UTF-8

"

def Main(request):
    return " Main  Page Hello ......"

def  All(request):
    return " HTTP/1.1 200 OK
Date:Fri, 22 May 2009 06:07:21 GMT
Content-Type: text/html; charset=UTF-8

<html><head</head><body>shiwei</body></html>"

routers = [
    ("/index$", Index),
    ('/main$', Main),
    ('/', All),
]


def run():

    ip_port = ("127.0.0.1", 80,)
    backlog = 5
    shiwei = []
    sk = socket.socket(2, 1)
    sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sk.bind(ip_port)
    sk.listen(backlog)
    shiwei.append(sk)

    while 1:
        rlist, wlist, elist = select.select(shiwei, [], [], 0.05)

        for r in rlist:
            if r == sk:
                print('服务端 socket=', r)
                conn, addr = sk.accept()
                conn.setblocking(False)
                shiwei.append(conn)
            else:
                """  客户端发来数据 """
                print("客户端 socket =",r)
                request = HttpRequest(r)
                # 1. 请求头中获取url
                # 2. 去路由中匹配,获取指定的函数
                # 3. 执行函数,获取返回值
                # 4. 将返回值 r.sendall(b'alskdjalksdjf;asfd')
                import re
                flag =  False
                func = None
                print("url=", request.url)
                for route in routers:
                    if re.match(route[0], request.url):
                        func = route[1]
                        flag = True
                        break
                if flag:
                    result = func(request)
                    r.send(bytes(result, encoding="utf-8"))
                    shiwei.remove(r)
                    r.close()
                else:
                    result = "404 Not Page...."
                    r.send(bytes(result, encoding="utf-8"))
                    shiwei.remove(r)
                    r.close()
                print("url=%s,--->result=%s" % (request.url, result))
if __name__ == '__main__':
    run()




异步 非 阻塞 Client 端

#!/usr/bin/env python
# -*- coding: utf-8 -*-#
# __name__   = Web_Framework.client_suosuo.py 
# __date__   = 2019/8/29  8:34
# __author__ ='Shi Wei'
# __description__: 
# --------------------------------------------------------------

"""   异步 非 阻塞 Client  端  """
import socket
import select

Buffer_size = 1024

class HttpRequest:
    def __init__(self, sk, url, host, callback):
        self.socket = sk
        self.url = url
        self.host = host
        self.callback = callback

    def fileno(self):
        return self.socket.fileno()

class HttpResponse:
    def __init__(self,recv_data):
        self.recv_data = recv_data

        self.http_version = ""
        self.status_code = ''
        self.status_msg = ""

        self.header_bytes = b''
        self.header_dict = {}
        self.body_bytes = b''
        self.initialize()

    @property
    def header_str(self):
        return self.header_bytes.decode("utf-8")
    def initialize(self):
        data = self.recv_data.split(b'

', 1)
        if len(data) == 1:  # 没有分割 , 服务端不按照正常格式发送数据
            self.body_bytes = data[0]
        else:               # 代表分割了, 可能为 ["xxxxxxxx", ''], 按照正常格式
            h, b = data
            self.header_bytes = h
            self.body_bytes = b
            self.initialize_header()
    def initialize_header(self):
        header_list = self.header_str.split("
")
        temp = header_list[0].split(" ")
        if len(temp) == 3:
            ht = temp[0].split('/')
            if len(ht) == 2:
                self.http_version = ht[1]
            self.status_code, self.status_msg = temp[1], temp[2]
            for header in header_list:
                var = header.split(':')
                if len(var) == 2:
                    k, v = var
                    self.header_dict[k] = v

class AsyncRequest:
    def __init__(self):
        self.shiwei = []
        self.haiyan = []
    def add_request(self,url, host, callback):
        try:
            sk = socket.socket()
            sk.setblocking(False)
            sk.connect((host, 80))
        except Exception as e:
            pass
        request = HttpRequest(sk, url, host, callback)
        self.shiwei.append(request)
        self.haiyan.append(request)

    def run(self):
        while 1:
            rlist, wlist, elist = select.select(self.shiwei, self.haiyan, self.shiwei, 0.05)

            for w in wlist:
                sendstr = "GET %s HTTP/1.0
Host: %s

"%(w.url, w.host)
                data = sendstr.encode("utf-8")
                w.socket.send(data)
                self.haiyan.remove(w)
            for r in rlist:
                recv_data = bytes()
                while 1:
                    try:
                        data = r.socket.recv(Buffer_size)
                    except Exception as e:
                        data = None
                    if not data:
                        break
                    else:
                        recv_data += data
                # print("%s--->%s"%(r.host, str(recv_data, encoding="utf-8")))
                response = HttpResponse(recv_data)
                r.callback(response)
                self.shiwei.remove(r)
            if not self.shiwei:
                break

def f1(response):
    print("33[1;30m 保存到文件中, 接收到的数据为: %s 33[0m"%(str(response.body_bytes, encoding="utf-8")))

def f2(response):
    print("33[1;43m 保存到数据库中,接收到的数据为: %s 33[0m"%(str(response.body_bytes, encoding="utf-8")))

urls = [
    # {"url": "/", "host": 'www.baidu.com', "callback": f1},
    # {"url": "/", "host": 'www.autohome.com', "callback": f2},
    {"url": "/index", "host": '127.0.0.1', "callback": f2},
    {"url": "/main", "host": "127.0.0.1", "callback": f1},
]

asy = AsyncRequest()

if __name__ == '__main__':
    for tem in urls:
        asy.add_request(**tem)
    print(asy.shiwei)
    asy.run()


Wupeiqi--Web Framework

原文地址:https://www.cnblogs.com/shiwei1930/p/11728086.html