并发编程:协程TCP、非阻塞IO、多路复用、

本文目录:

一、线程池实现阻塞IO

二、非阻塞IO模型

三、多路复用,降低CPU占用

四、模拟异步IO

 

一、线程池实现阻塞IO

线程阻塞IO 客户端

import socket

c = socket.socket()

c.connect(("127.0.0.1",9999))

while True:
    msg = input(">>>:")
    if not msg:continue
    c.send(msg.encode("utf-8"))
    data = c.recv(1024)
    print(data.decode("utf-8"))

线程阻塞IO服务器

from concurrent.futures import ThreadPoolExecutor
import socket

server = socket.socket()
# 重用端口
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)

server.bind(("127.0.0.1",9999))

server.listen(5)

# 线程池
pool = ThreadPoolExecutor(3)


def data_handler(conn):
    print("一个新连接..")
    while True:
        data = conn.recv(1024)
        conn.send(data.upper())

while True:
    conn,addr = server.accept()
    # 切到处理数据的任务去执行
    pool.submit(data_handler,conn)

 

二、非阻塞IO模型

迭代期间不能被修改迭代的对象

li = [1,2,3,4,5,6]
def mytlist_iter():
    for i in range(len(li)):
        yield li[i]


for j in mytlist_iter():
    if j == 5:
        li.append(1000)


d = {"a":1,"b":2}
for k in d:
    if k == "a":
        d.pop(k)

线程阻塞IO客户端

import socket

c = socket.socket()

c.connect(("127.0.0.1",9999))

while True:
    msg = input(">>>:")
    if not msg:continue
    c.send(msg.encode("utf-8"))
    data = c.recv(1024)
    print(data.decode("utf-8"))

线程阻塞IO服务器

from concurrent.futures import ThreadPoolExecutor
import socket

server = socket.socket()
# 重用端口
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)

server.bind(("192.168.11.210",9999))

server.listen(5)

# 设置是否为阻塞 默认阻塞
server.setblocking(False)

def data_handler(conn):
    print("一个新连接..")
    while True:
        data = conn.recv(1024)
        conn.send(data.upper())
# 已连接的客户端
clients = []
# 需要发送的数据
send_datas = []
# 已经发送完的 需要删除的数据
del_datas = []
# 待关闭的客户端
closed_cs = []
import time
while True:
    try:
        conn,addr = server.accept()
        # 切到处理数据的任务去执行
        # 代码走到这里才算是连接成功
        # 把连接成功的客户端存起来
        clients.append(conn)
    except BlockingIOError:
        # print("没有可以处理的连接 就干别的活儿")
        #要处理的是已经连接成功的客户端
        # 接收数据
        for c in clients:
            try:
                data = c.recv(1024)
                if not data:
                    # 对方关闭了连接
                    c.close()
                    # 从客户端列表中删除它
                    closed_cs.append(c)
                    continue
                print("收到%s" % data.decode("utf-8"))
                # 现在非阻塞 send直接往缓存赛 如果缓存满了 肯定有错误  需要单独处理发送
                # c.send(data.upper())
                send_datas.append((c,data))
            except BlockingIOError:
                pass
            except ConnectionResetError:
                # 对方关闭了连接
                c.close()
                # 从客户端列表中删除它
                closed_cs.append(c)
        # 处理发送数据
        for data in send_datas:
            try:
                data[0].send(data[1].upper())
                # 发送成功需要删除 不能直接删除
                # send_datas.remove(data)
                del_datas.append(data)
            except BlockingIOError:
                continue
            except ConnectionResetError:
                # 客户端连接需要删除
                data[0].close()
                closed_cs.append(data[0])
                # 等待发送的数据需要删除
                del_datas.append(data)
        # 删除无用的数据
        for d in del_datas:
            #从待发送的列表中删除
            send_datas.remove(d)
        del_datas.clear()
        for c in closed_cs:
            clients.remove(c)
        closed_cs.clear()

非线程阻塞IO 客户端

import socket

c = socket.socket()

c.connect(("127.0.0.1",9999))

while True:
    msg = input(">>>:")
    if not msg:continue
    c.send(msg.encode("utf-8"))
    data = c.recv(1024)
    print(data.decode("utf-8"))

非线程阻塞IO 服务器

from concurrent.futures import ThreadPoolExecutor
import socket

server = socket.socket()
# 重用端口
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)

server.bind(("192.168.11.210",9999))

server.listen(5)

# 设置是否为阻塞 默认阻塞
server.setblocking(False)

def data_handler(conn):
    print("一个新连接..")
    while True:
        data = conn.recv(1024)
        conn.send(data.upper())
# 已连接的客户端
clients = []
# 需要发送的数据
send_datas = []
# 已经发送完的 需要删除的数据
del_datas = []
# 待关闭的客户端
closed_cs = []
import time
while True:
    try:
        conn,addr = server.accept()
        # 切到处理数据的任务去执行
        # 代码走到这里才算是连接成功
        # 把连接成功的客户端存起来
        clients.append(conn)
    except BlockingIOError:
        # print("没有可以处理的连接 就干别的活儿")
        #要处理的是已经连接成功的客户端
        # 接收数据
        for c in clients:
            try:
                data = c.recv(1024)
                if not data:
                    # 对方关闭了连接
                    c.close()
                    # 从客户端列表中删除它
                    closed_cs.append(c)
                    continue
                print("收到%s" % data.decode("utf-8"))
                # 现在非阻塞 send直接往缓存赛 如果缓存满了 肯定有错误  需要单独处理发送
                # c.send(data.upper())
                send_datas.append((c,data))
            except BlockingIOError:
                pass
            except ConnectionResetError:
                # 对方关闭了连接
                c.close()
                # 从客户端列表中删除它
                closed_cs.append(c)
        # 处理发送数据
        for data in send_datas:
            try:
                data[0].send(data[1].upper())
                # 发送成功需要删除 不能直接删除
                # send_datas.remove(data)
                del_datas.append(data)
            except BlockingIOError:
                continue
            except ConnectionResetError:
                # 客户端连接需要删除
                data[0].close()
                closed_cs.append(data[0])
                # 等待发送的数据需要删除
                del_datas.append(data)
        # 删除无用的数据
        for d in del_datas:
            #从待发送的列表中删除
            send_datas.remove(d)
        del_datas.clear()
        for c in closed_cs:
            clients.remove(c)
        closed_cs.clear()

 

三、多路复用,降低CPU占用

多路复用客户端

import socket

c = socket.socket()

c.connect(("192.168.11.210",9999))

while True:
    msg = input(">>>:")
    if not msg:continue
    c.send(msg.encode("utf-8"))
    data = c.recv(1024)
    print(data.decode("utf-8"))

多路复用服务器

from concurrent.futures import ThreadPoolExecutor
import socket
import select
# select 帮你从一堆连接中找出来需要被处理的连接

server = socket.socket()
# 重用端口
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)

server.bind(("192.168.11.210",9999))

server.listen(5)

# 设置是否为阻塞 默认阻塞
server.setblocking(False)

def data_handler(conn):
    print("一个新连接..")
    while True:
        data = conn.recv(1024)
        conn.send(data.upper())

# 需要检测的 是否可读取的列表  (recv就是一个读取操作)
rlist = [server,]
# 需要检测的 是否写入的列表  (send就是写入操作)
wlist = []

# 需要发送的数据 目前是因为 我们要把接收的数据在发回去 所以搞了这个东西 正常没有这种需求
# 目前客户端与服务器端 交互 是必须客户端发送数据 服务器端才能返回数据   正常没有这种需求
dic = {}


while True: # 用于检测需要处理的连接 需要不断检测 所以循环
    # rl目前可读的客户端列表  wl目前可写的客户端列表
    rl,wl,xl = select.select(rlist,wlist,[]) # select默认阻塞 阻塞到任意一个连接可以被处理
    print(len(rl))
    # 处理可读的socket
    for c in rl:
        # 无论是客户端还是服务器只要可读就会执行到这里
        if c == server:
            # 接收客户端的连接请求 (一个读操作)
            conn,addr = c.accept()
            # 将新连接也交给select来检测
            rlist.append(conn)
        else:# 不是服务器 就是客户端 客户端可读 可以执行recv
            try:
                data = c.recv(1024)
                if not data:
                    c.close()
                    rlist.remove(c)
                print("%s 发送 %s" % (c,data.decode("utf-8")))
                # 给客户端发送数据 前要保证目前可以发送 将客户端加入检测列表
                wlist.append(c)  # 正常开发中 不可能必须客户端发送数据过来后 才能 给客户端发送
                                 # 所以这个添加到检测列表的操作 应该建立连接后立即执行
                # 要发送的数据
                dic[c] = data
            except ConnectionResetError:
                # 客户端关闭连接
                c.close()
                rlist.remove(c)
    # 处理可写的socket
    for c in wl:
        print(c)
        try:
            c.send(dic[c].upper())
            # 删除数据
            dic.pop(c)
            # 从检测列表中删除已发送完成的客户端
            wlist.remove(c)
        except ConnectionResetError:
            c.close() # 关闭连接
            dic.pop(c) # 删除要发送的数据
            wlist.remove(c) # 从待检测的列表中删除
        except BlockingIOError:#可能缓存满了 发不了
            pass

 

四、模拟异步IO

import asyncio
asyncio.coroutine()
from concurrent.futures import  ThreadPoolExecutor

def task():
    print("read start")
    with open(r"D:python视频存放目录python笔记day40多路复用,降低CPU占用服务器.py",encoding="utf-8") as f:
        text = f.read()
        # f.write()
    print("read end")
    return text


def fin(f):
    print("fin")
    print(f.result())


pool = ThreadPoolExecutor(1)
future = pool.submit(task)
future.add_done_callback(fin)

print("主 over")
# 这种方式看起来像是异步IO 但是对于子线程而言不是
# 在子线程中 执行read 是阻塞的 以为CPU必须切走 但是不能保证切到当前程序的其他线程
# 想要的效果就是 在执行read 是不阻塞 还能干其他活   谁能实现  只有协程
# asyncio 内部是使用的是协程
原文地址:https://www.cnblogs.com/wuzhengzheng/p/10273448.html