网络编程的各种模块

各种模块:

from gevent import monkey;monkey.patch_all()
import time
from gevent import spawn
import select
from multiprocessing import Process,Semaphore,RLock,Event,JoinableQueue
from threading import Thread,Semaphore,Event,RLock
import queue
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor


pool1 = ProcessPoolExecutor()
pool2 = ThreadPoolExecutor()
a = queue.Queue()
b = queue.LifoQueue()
c = queue.PriorityQueue()
d = JoinableQueue()
x = Semaphore(5)
e = Event()

def task(tt):
    print(tt)  # tt 普通参数

    x.acquire()
    # print(1)
    # time.sleep(2)
    x.release()
    # e.set()
    return 'ok'

# TypeError: task1() takes 0 positional arguments but 1 was given
def task1(t):
    print(t)  # <Future at 0x31eb4d0 state=finished returned NoneType>
    print(t.result())  # ok 返回的是提交进程,的结果;也就是说 add_done_callback 里的方法名,默认第一个参数是Future 对象,,,
                       # 即task这个任务提交之后的返回的对象。也就是说,这个add_done_callback是个对象方法。self。。。
    # e.wait()
    print('wl')

if __name__ == '__main__':
    # for i in range(20):
    #     t1 = Thread(target=task)
    #     t1.start()
    # t2 = Thread(target=task1)
    # t2.start()
    for i in range(20):
        pool2.submit(task,i).add_done_callback(task1)  # submit 提交
        if i == 10:
            pool2.shutdown()  # RuntimeError: cannot schedule new futures after shutdown 关闭线程池,循环并没有结束,会再次提交任务,所以,要break
            break
"""
异步提交
gevent模块本身无法检测常见的一些io操作
在使用的时候需要你额外的导入一句话
from gevent import monkey
monkey.patch_all()
又由于上面的两句话在使用gevent模块的时候是肯定要导入的
所以还支持简写
from gevent import monkey;monkey.patch_all()
"""


# def heng():
#     print('哼')
#     time.sleep(2)
#     print('哼')
#
#
# def ha():
#     print('哈')
#     time.sleep(3)
#     print('哈')
#
# def heiheihei():
#     print('heiheihei')
#     time.sleep(5)
#     print('heiheihei')
#
#
# start_time = time.time()
# g1 = spawn(heng).join()
# g2 = spawn(ha)
# g3 = spawn(heiheihei)
# # g1.join()
# # g2.join()  # 等待被检测的任务执行完毕 再往后继续执行
# # g3.join()
# # heng()
# # ha()
# # print(time.time() - start_time)  # 5.005702018737793
# print(time.time() - start_time)  # 3.004199981689453   5.005439043045044

多路复用IO:

"""
当监管的对象只有一个的时候 其实IO多路复用连阻塞IO都比比不上!!!
但是IO多路复用可以一次性监管很多个对象

server = socket.socket()
conn,addr = server.accept()

监管机制是操作系统本身就有的 如果你想要用该监管机制(select)
需要你导入对应的select模块
"""
import socket
import select


server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5)
server.setblocking(False)
read_list = [server]


while True:
    r_list, w_list, x_list = select.select(read_list, [], [])
    """
    帮你监管
    一旦有人来了 立刻给你返回对应的监管对象
    一开始只监管server,当有客户端连接,被监管的server,有了回应,所以返回的r_list中有server
    添加连接对象到read_list,增加了一个监管的对象。当连接对象,有发送数据,那么,检测到了结果,返回的r_list,就有了连接对象。
    也就是说被监管的对象,哪个有结果回应,就添加到r_list中,从而程序往下运行。
    当连接断开时,也是一种结果,收到的信息为空。
    """
    # print(res)  # ([<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>], [], [])
    # print(server)
    # print(r_list)
    print(read_list)
    print(r_list,w_list,x_list)  # r_list 一开始只有server,有了连接对象之后,只有连接对象。
    for i in r_list:  #
        """针对不同的对象做不同的处理"""
        if i is server:
            conn, addr = i.accept()
            # 也应该添加到监管的队列中
            read_list.append(conn)
            print('111111111')
        else:
            res = i.recv(1024)
            if len(res) == 0:
                i.close()
                # 将无效的监管对象 移除
                read_list.remove(i)
                print(res)  # b''
                continue
            print(res)
            i.send(b'heiheiheiheihei')
原文地址:https://www.cnblogs.com/pythonwl/p/12796585.html