11_IO多路复用

1.IO概述

    input 和 output: 是在内存中存在的数据交换操作

    内存和磁盘交换: 文件读写, 打印

    内存和网络交换: recv send recvfrom, sendto

    IO密集型程序: 程序中执行大量的IO操作,而较少的需要CPU运行,消耗CPU资源少,运行周期往往比较长

    CPU密集型程序: 程序中执行大量的CPU运算,而较少的需要IO操作,消耗CPU资源多

2.IO分类(阻塞/非阻塞)

    阻塞IO: 默认形态,是效率最低的一种IO
        1.因为等待某种条件达成再继续运行,例如: accept recv input
        2.处理IO事件时耗时较长也会产生阻塞,例如: 文件的读写过程,网络数据的传输过程

    非阻塞IO: 通过修改IO对象使其变为非阻塞状态,通常用循环来不断判断阻塞条件,需要消耗更多的CPU但是在一定程度上提高了IO效率

    IO多路复用: 通过同时监控多个IO事件,当那个IO事件就绪就执行那个IO事件,形成并发效果,也被称作事件驱动IO

    信号驱动IO: 准备数据阶段(用户进程非阻塞),复制数据阶段(用户进程阻塞)
        1.当调用read函数的时候,准备数据的过程中用户线程不阻塞,用户线程可以去做其他事情
        2.等到数据准备完了,用户进程会收到一个SIGIO信号,然后可以在信号处理函数中处理数据

    异步IO: Python实现不了,但是tornado框架自带异步IO
        1.阻塞IO,非阻塞IO,多路复用IO,信号驱动IO都是同步IO,都需等待将数据从内核缓冲区拷贝到用户内存,会阻塞进程
        2.异步IO是异步IO,当用户发起一个操作后就立即去做其他事情了,内核会等待,检测数据直到数据准备完成
        3.然后将数据拷贝到用户内存(无需先拷贝到内核缓冲区),这一切完成后会给用户进程发送一个信号告知操作完成了

3.非阻塞IO-效率比阻塞IO高

1.非阻塞IO设置方法

import socket

s = socket.socket()  # 默认会创建流式套接字

# 修改套接字设置的阻塞状态
# 参数(bool类型): 默认为True,设置为False则为非阻塞
s.setblocking()

2.TCP服务端-非阻塞IO

import socket
import time


def main():
    # 创建数据流套接字
    tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 设置端口重用
    tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # 绑定端口,运行局域网内的客户端连接
    tcp_server_socket.bind(("0.0.0.0", 7890))
    # 让默认的套接字由主动变为被动监听 listen
    tcp_server_socket.listen(128)
    # 设置套接字为非阻塞状态
    tcp_server_socket.setblocking(False)

    # 循环目的: 多次调用accept等待客户端连接,为多个客服端服务
    while True:
        print("等待一个新的客户端的到来...")
        try:
            new_client_socket, client_addr = tcp_server_socket.accept()
        except BlockingIOError:
            time.sleep(1)
            print(time.ctime())
            continue
        print("客户端' %s '已经到来" % str(client_addr))

        # 设置客户端字为非阻塞状态
        new_client_socket.setblocking(False)

        # 循环目的: 为同一个客服端服务多次
        while True:
            try:
                # 接收客户端发送过来的请求
                recv_data = new_client_socket.recv(1024)
                # recv解堵塞的两种方式: 1.客户端发送过来数据,2.客户端调用了close
                if recv_data:
                    print("客户端' %s '发送过来的请求是: %s" % (str(client_addr), recv_data.decode("utf-8")))
                    # 回送数据给客户端表示响应客户端的请求
                    send_data = "----ok----Request accepted"
                    new_client_socket.send(send_data.encode("utf-8"))
                else:
                    break
            except BlockingIOError:
                time.sleep(1)
                print(time.time())
                continue

        # 关闭accept返回的套接字
        new_client_socket.close()
        print("已经为 %s 客户端已经服务完毕" % str(client_addr))
    # 关闭监听套接字
    tcp_server_socket.close()


if __name__ == "__main__":
    main()

3.TCP客户端-非阻塞IO

import socket


def main():
    # 创建数据流套接字
    tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 连接服务器
    server_ip = input("请输入要连接的服务器的ip:")
    serve_port = int(input("请输入要连接的服务器的port:"))
    server_addr = (server_ip, serve_port)
    tcp_client_socket.connect(server_addr)
    # 发送数据
    send_data = input("请输入要发生的数据:")
    tcp_client_socket.send(send_data.encode("utf-8"))
    # 接收服务器发送过来的数据
    recv_data = tcp_client_socket.recv(1024)
    print("接收到的数据为:%s" % recv_data.decode("utf-8"))
    # 关闭套接字
    tcp_client_socket.close()


if __name__ == "__main__":
    main()

4.超时检测(超时等待)-效率比非阻塞IO高

1.超时检测设置

import socket

s = socket.socket()  # 默认会创建流式套接字

# 设置套接字的超时检测,所谓的超时检测即对原本阻塞的函数进行设置,使其不再始终阻塞,而是阻塞等待一定时间后自动返回
# 参数: 超时时间,在规定时间中如果正常接收阻塞则继续执行否则产生timeout异常
s.settimeout(5)  # 设置超时时间为5秒

2.TCP服务端-超时检测

import socket
import traceback


def main():
    # 创建数据流套接字
    tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 设置端口重用
    tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # 绑定端口,运行局域网内的客户端连接
    tcp_server_socket.bind(("0.0.0.0", 7890))
    # 让默认的套接字由主动变为被动监听 listen
    tcp_server_socket.listen(128)
    # 设置套接字超时等待时间为5秒
    tcp_server_socket.settimeout(5)

    # 循环目的: 多次调用accept等待客户端连接,为多个客服端服务
    while True:
        print("等待一个新的客户端的到来...")
        try:
            new_client_socket, client_addr = tcp_server_socket.accept()
        except Exception:
            traceback.print_exc()
            continue
        print("客户端' %s '已经到来" % str(client_addr))

        # 设置客户端字的超时等待时间为5秒
        new_client_socket.settimeout(5)

        # 循环目的: 为同一个客服端服务多次
        while True:
            try:
                # 接收客户端发送过来的请求
                recv_data = new_client_socket.recv(1024)
                # recv解堵塞的两种方式: 1.客户端发送过来数据,2.客户端调用了close
                if recv_data:
                    print("客户端' %s '发送过来的请求是: %s" % (str(client_addr), recv_data.decode("utf-8")))
                    # 回送数据给客户端表示响应客户端的请求
                    send_data = "----ok----Request accepted"
                    new_client_socket.send(send_data.encode("utf-8"))
                else:
                    break
            except Exception:
                traceback.print_exc()
                continue

        # 关闭accept返回的套接字
        new_client_socket.close()
        print("已经为 %s 客户端已经服务完毕" % str(client_addr))
    # 关闭监听套接字
    tcp_server_socket.close()


if __name__ == "__main__":
    main()

3.TCP客户端-超时检测

import socket


def main():
    # 创建数据流套接字
    tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 连接服务器
    server_ip = input("请输入要连接的服务器的ip:")
    serve_port = int(input("请输入要连接的服务器的port:"))
    server_addr = (server_ip, serve_port)
    tcp_client_socket.connect(server_addr)
    # 发送数据
    send_data = input("请输入要发生的数据:")
    tcp_client_socket.send(send_data.encode("utf-8"))
    # 接收服务器发送过来的数据
    recv_data = tcp_client_socket.recv(1024)
    print("接收到的数据为:%s" % recv_data.decode("utf-8"))
    # 关闭套接字
    tcp_client_socket.close()


if __name__ == "__main__":
    main()

5.IO多路复用-效率比超时等待高

1.IO多路复用概述

        1.IO多路复用的定义: 同时监控多个IO事件,当那个IO事件就绪就执行那个IO事件,形成并发效果

        2.使用IO多路复用的注意点
            1.在处理IO过程中不应该发生死循环(即某个IO单独占用服务器)
            2.IO多路复用是单进程程序,是一个并发程序
            3.IO多路复用有较高的IO执行效率

2.IO多路复用-select模块中的三个方法(select,pool,epool)

        1.三个方法的适用平台
            select: 适用于Windows Linux Unix MacOS
            poll: 适用于Linux Unix MacOS
            epoll: 适用于Linux Unix

        2.select方法-采用轮询机制,32位机文件描述符只能开1024,64位机文件描述符只能开2048
            r,w,x = select(rlist, wlist, xlist, [timeout])
                功能: 监控IO事件,阻塞等待IO事件发生
                参数:
                    rlist: 列表,存放要监控等待处理的IO,例如客户端主动连接,或客户端发送消息
                    wlist: 列表,存放希望主动处理的IO,例如回送消息给客户端
                    xlist: 列表,存放如果发生异常需要处理的IO
                    timeout: 数字,超时检测,默认一直阻塞
                返回值:
                    r: 列表,rlist中准备就绪的IO
                    w: 列表,wlist中准备就绪的IO
                    x: 列表,xlist中准备就绪的IO

        3.poll方法-采用轮询机制突破了文件描述符只能开1024的限制(1G内存大概能开10万个事件去监听),效率上和select差不多
            1.创建poll对象
                p = select.poll()
            2.加入关注的IO
                p.register(s)  # 添加IO关注
                p.unregister(s)  # 从关注IO中删除
            3.使用poll函数监控
                events = p.poll()
                功能: 阻塞等待register的事件只要有任意准备就绪即返回
                返回值: events  # [(fileno, event), (), ()]; fileno: 文件描述符; event: 准备就绪的事件
            4.处理发生的IO事件
                # poll的IO事件
                POLLIN  # 可读rlist
                POLLOUT  # 可写wlist
                POLLUP  # 断开连接
                POLLERR  # 异常xlist
                POLLPRI  # 紧急处理
                POLLVAL  # 无效数据

        4.epoll方法: 采用了回调机制同样也突破了文件描述符只能开1024的限制(1G内存大概能开10万个事件去监听)
            1.效率上比poll和select稍高,但只能用于Linux和Unix平台
            2.epoll既可以采用水平触发,也可以采用边缘触发,而select和pool只支持水平触发
            3.epoll实现原理提流程图: https://www.processon.com/view/link/5efd699c7d9c08442043e5b8
            4.水平触发: 如果文件描述符已经就绪可以非阻塞的执行IO操作了,此时会触发通知,允许在任意时刻重复检测IO的状态,
                没有必要每次描述符就绪后尽可能多的执行IO,select和pool就属于水平触发
            5.边缘触发: 如果文件描述符自上次状态改变后有新的IO活动到来,此时会触发通知,在收到一个IO事件通知后要尽可能多的执行IO操作,
                因为如果在一次通知中没有执行完IO那么就需要等到下一次新的IO活动到来才能获取到就绪的描述符,信号驱动式IO就属于边缘触发

3.TCP服务端-select方法实现IO多路复用

import socket
import sys
import traceback
import select


def main():
    # 创建数据流套接字
    tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 设置端口重用
    tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # 绑定端口,运行局域网内的客户端连接
    tcp_server_socket.bind(("0.0.0.0", 7890))
    # 让默认的套接字由主动变为被动监听 listen
    tcp_server_socket.listen(128)
    # 设置套接字超时等待时间为5秒
    tcp_server_socket.settimeout(5)

    # 创建一个列表存放要监控等待处理的IO
    rlist = [tcp_server_socket]
    # 创建一个列表存放希望主动处理的IO
    wlist = list()
    # 创建一个列表存放如果发生异常需要处理的IO
    xlist = [tcp_server_socket]

    # 循环目的: 多次调用select监控客户端连接事件,阻塞等待客户端连接事件发生
    while True:
        print("监控是否有新的客户端的到来...")
        # 参数说明: rlist: 列表,存放要监控等待处理的IO; wlist: 列表,存放希望主动处理的IO
        # 参数说明: xlist: 列表,存放如果发生异常需要处理的IO; timeout: 数字,超时检测,默认一直阻塞
        # 返回值说明: rs: 列表,rlist中准备就绪的IO; ws: 列表,wlist中准备就绪的IO; xs: 列表,xlist中准备就绪的IO
        # wlist有内容,select会解除阻塞立即返回
        rs, ws, xs = select.select(rlist, wlist, xlist, 5)  # 设置超时等待时间为5秒

        # 遍历准备就绪的IO列表
        for r in rs:
            # 有客户端连接时
            if r is tcp_server_socket:
                new_client_socket, client_addr = r.accept()
                print("客户端' %s '已经到来" % str(client_addr))
                # 将为客户端服务的套接字加入监控列表rlist
                rlist.append(new_client_socket)
            else:
                try:
                    # 已经连接的客户端发送消息时
                    recv_data = r.recv(1024)
                    if recv_data:
                        # r.getsockname()可以获取套接字绑定的地址
                        print("客户端' %s '发送过来的请求是: %s" % (str(r.getsockname()), recv_data.decode("utf-8")))
                        # 服务器回送消息放到主动处理列表wlist
                        wlist.append(r)
                    else:
                        # 如果客户端是调用了close断开了连接,那么需要从监控列表中删除为客户端服务的套接字
                        rlist.remove(r)
                        print("已经为 %s 客户端已经服务完毕" % str(r.getsockname()))
                        # 关闭为客户端服务的套接字
                        r.close()
                except Exception:
                    traceback.print_exc()
        # 遍历需要服务器主动处理的IO列表
        for w in ws:
            # 回送数据给客户端表示响应客户端的请求
            send_data = "----ok----Request accepted"
            w.send(send_data.encode("utf-8"))
            # 把为客户端服务的套接字从主动处理的列表中删除
            wlist.remove(w)
        # 遍历异常请求的IO列表
        for x in xs:
            # 如果监听套接字发生了异常
            if x is tcp_server_socket:
                # 关闭异常的监听套接字
                x.close()
                # 退出程序
                sys.exit(1)


if __name__ == "__main__":
    main()

4.TCP客户端-select方法实现IO多路复用

import socket


def main():
    # 创建数据流套接字
    tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 连接服务器
    server_ip = input("请输入要连接的服务器的ip:")
    serve_port = int(input("请输入要连接的服务器的port:"))
    server_addr = (server_ip, serve_port)
    tcp_client_socket.connect(server_addr)
    # 发送数据
    send_data = input("请输入要发生的数据:")
    tcp_client_socket.send(send_data.encode("utf-8"))
    # 接收服务器发送过来的数据
    recv_data = tcp_client_socket.recv(1024)
    print("接收到的数据为:%s" % recv_data.decode("utf-8"))
    # 关闭套接字
    tcp_client_socket.close()


if __name__ == "__main__":
    main()

5.TCP服务器-pool方法实现IO多路复用

import socket
import select
import traceback
import sys


def main():
    # 创建数据流套接字
    tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 设置端口重用
    tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # 绑定端口,运行局域网内的客户端连接
    tcp_server_socket.bind(("0.0.0.0", 7890))
    # 让默认的套接字由主动变为被动监听 listen
    tcp_server_socket.listen(128)
    # 设置套接字超时等待时间为5秒
    tcp_server_socket.settimeout(5)

    # 创建一个IO事件地图
    fdmap = {tcp_server_socket.fileno(): tcp_server_socket}
    # 创建poll对象
    epool_list = select.poll()
    # 将监听套接对应的fd注册到poll中进行监听,如果fd已经注册过,则会发生异常
    epool_list.register(tcp_server_socket, (select.POLLIN | select.POLLERR))

    # 循环目的: 等待新的客户端连接或者已连接的客户端发送过来数据
    while True:
        print("开始事件监控")
        events = epool_list.poll()
        for fd, event in events:
            if fd == tcp_server_socket.fileno():
                new_client_socket, client_addr = tcp_server_socket.accept()
                print("客户端' %s '已经到来" % str(client_addr))
                epool_list.register(new_client_socket, select.POLLIN)
                # 记录这个信息
                fdmap[new_client_socket.fileno()] = new_client_socket
            elif event & select.POLLIN:
                try:
                    # 已经连接的客户端发送消息时
                    recv_data = fdmap[fd].recv(1024)
                    if not recv_data:
                        # 在poll中注销客户端的信息
                        epool_list.unregister(fd)
                        print("已经为客户端已经服务完毕")
                        # 关闭客户端的文件句柄
                        fdmap[fd].close()
                        # 在字典中删除与已关闭客户端相关的信息
                        del fdmap[fd]
                    else:
                        print("客户端发送过来的请求是: %s" % (recv_data.decode("utf-8")))
                        # 回送数据给客户端表示响应客户端的请求
                        send_data = "----ok----Request accepted"
                        fdmap[fd].send(send_data.encode("utf-8"))
                except Exception:
                    traceback.print_exc()
            elif event & select.POLLERR:
                # 如果监听套接字发生了异常
                if fd is tcp_server_socket.fileno():
                    # 关闭异常的监听套接字
                    fdmap[fd].close()
                    # 退出程序
                    sys.exit(1)


if __name__ == "__main__":
    main()

6.TCP客户端-pool方法实现IO多路复用

import socket


def main():
    # 创建数据流套接字
    tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 连接服务器
    server_ip = input("请输入要连接的服务器的ip:")
    serve_port = int(input("请输入要连接的服务器的port:"))
    server_addr = (server_ip, serve_port)
    tcp_client_socket.connect(server_addr)
    # 发送数据
    send_data = input("请输入要发生的数据:")
    tcp_client_socket.send(send_data.encode("utf-8"))
    # 接收服务器发送过来的数据
    recv_data = tcp_client_socket.recv(1024)
    print("接收到的数据为:%s" % recv_data.decode("utf-8"))
    # 关闭套接字
    tcp_client_socket.close()


if __name__ == "__main__":
    main()

7.TCP服务器-epool方法实现IO多路复用

import socket
import select
import traceback
import sys


def main():
    # 创建数据流套接字
    tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 设置端口重用
    tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # 绑定端口,运行局域网内的客户端连接
    tcp_server_socket.bind(("0.0.0.0", 7890))
    # 让默认的套接字由主动变为被动监听 listen
    tcp_server_socket.listen(128)
    # 设置套接字超时等待时间为5秒
    tcp_server_socket.settimeout(5)

    # 创建一个IO事件地图
    fdmap = {tcp_server_socket.fileno(): tcp_server_socket}
    # 创建poll对象
    epool_list = select.poll()
    # 将监听套接对应的fd注册到poll中进行监听,如果fd已经注册过,则会发生异常
    epool_list.register(tcp_server_socket, (select.EPOLLIN | select.EPOLLERR))

    # 循环目的: 等待新的客户端连接或者已连接的客户端发送过来数据
    while True:
        print("开始事件监控")
        events = epool_list.poll()
        for fd, event in events:
            if fd == tcp_server_socket.fileno():
                new_client_socket, client_addr = tcp_server_socket.accept()
                print("客户端' %s '已经到来" % str(client_addr))
                epool_list.register(new_client_socket, select.EPOLLIN)
                # 记录这个信息
                fdmap[new_client_socket.fileno()] = new_client_socket
            elif event & select.EPOLLIN:
                try:
                    # 已经连接的客户端发送消息时
                    recv_data = fdmap[fd].recv(1024)
                    if not recv_data:
                        # 在poll中注销客户端的信息
                        epool_list.unregister(fd)
                        print("已经为客户端已经服务完毕")
                        # 关闭客户端的文件句柄
                        fdmap[fd].close()
                        # 在字典中删除与已关闭客户端相关的信息
                        del fdmap[fd]
                    else:
                        print("客户端发送过来的请求是: %s" % (recv_data.decode("utf-8")))
                        # 回送数据给客户端表示响应客户端的请求
                        send_data = "----ok----Request accepted"
                        fdmap[fd].send(send_data.encode("utf-8"))
                except Exception:
                    traceback.print_exc()
            elif event & select.EPOLLERR:
                if fd is tcp_server_socket.fileno():
                    # 关闭异常的监听套接字
                    fdmap[fd].close()
                    # 退出程序
                    sys.exit(1)


if __name__ == "__main__":
    main()

8.TCP客户端-epool方法实现IO多路复用

import socket


def main():
    # 创建数据流套接字
    tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 连接服务器
    server_ip = input("请输入要连接的服务器的ip:")
    serve_port = int(input("请输入要连接的服务器的port:"))
    server_addr = (server_ip, serve_port)
    tcp_client_socket.connect(server_addr)
    # 发送数据
    send_data = input("请输入要发生的数据:")
    tcp_client_socket.send(send_data.encode("utf-8"))
    # 接收服务器发送过来的数据
    recv_data = tcp_client_socket.recv(1024)
    print("接收到的数据为:%s" % recv_data.decode("utf-8"))
    # 关闭套接字
    tcp_client_socket.close()


if __name__ == "__main__":
    main()
原文地址:https://www.cnblogs.com/tangxuecheng/p/13634530.html