day53-线程池、协程、IO

#1、from concurrent import futures可以开启进程池和线程池。concurrent是包,futures是模块,ThreadPoolExecutor是类,submit是方法。
#submit创建和开启子线程:
from concurrent import futures
import time
import random

def func(n):
    print(n)
    time.sleep(random.randint(1,3))#看效果:一开始先执行5个线程,后来谁先执行完就结束,轮到下一个线程执行。

p = futures.ThreadPoolExecutor(5) #线程池里有5个线程。
for i in range(10):              #开启10个子线程。
    p.submit(func,'hello,world')  #submit合并了创建线程对象和start的功能。

#2、result获取返回值,shutdown封装了close和join:
#主线程传参给子线程处理数据,子线程把值返回给主线程。
from concurrent import futures
def func(n):
    print(n)
    return n + 1

thread_pool = futures.ThreadPoolExecutor(5)
t_lst = []
for i in range(10):
    t = thread_pool.submit(func,1) #submit提交任务
    t_lst.append(t)
thread_pool.shutdown() #shutdown封装了close()和join(),意思是线程池关闭继续放线程的功能,
                       # 主线程阻塞在这里,等待子线程全部结束之后才变成非阻塞,下面的代码才能继续执行。
                       #这样操作的结果就是,先打印出n最后才打印n+1。
for t in t_lst:
    print(t.result()) #获取返回值

#3、map创建和开启子线程,后面必须是可迭代的,不可以接收返回值,所以如果func有return返回值,是无法接收的。
from concurrent import futures
def func(i):
    print(i)

thread_pool = futures.ThreadPoolExecutor(5)
thread_pool.map(func,range(10))

#4、回调函数:add_done_callback(call)
#call的args接收func的返回值
from concurrent import futures
def func(i):
    return i*'*'

def call(args):
    print(args.result())

thread_pool = futures.ThreadPoolExecutor(5)
for i in range(10):
    t = thread_pool.submit(func,i)
    t.add_done_callback(call)

#5、from concurrent import futures 还可以开启进程,只需要把上面的ThreadPoolExecutor修改为ProcessPoolExecutor,再加上
#if __name__ == '__main__'就可以了,其他代码都不需要修改。
if __name__ == '__main__':
p = futures.ProcessPoolExecutor()

#6、进程和线程都不能无限开启,进程数量 = CPU+1 ,线程数量 = CPU*5

协程:

#1、greenlet:在单线程中切换状态的模块
from greenlet import greenlet
def eat():
    print('a')
    g2.switch()
    print('c')
    g2.switch()

def eat2():
    print('b')
    g1.switch()
    print('d')

g1 = greenlet(eat)
g2 = greenlet(eat2)
g1.switch()

#2、gevent的底层是greenlet
import gevent
def func():
    print(1)
    gevent.sleep(1) #如果这里是time.sleep(1)是无法切换的,因为gevent只能识别它内置的IO才会自动切换。
    print(3)

def func1():
    print(2)
    gevent.sleep(1)
    print(4)

g1 = gevent.spawn(func)
g2 = gevent.spawn(func1)
# g1.join()
# g2.join()
gevent.joinall([g1,g2]) #相当于g1.join()和g2.join(),主协程等待子协程的结束而结束。
# 1
# 2
# 3
# 4

#3、第一二句代码必须写在最上面,意思是把IO都打成一个包,这样下面的time.sleep就可以识别了,然后就可以切换了。
#第一二句代码能识别gevent内置当中的IO,以及导入模块当中的IO,导入的模块目前学习到的是:time,socket,urllib,requests。
from gevent import monkey
monkey.patch_all()
import time
import gevent
def func(n):
    print(n)
    time.sleep(1)
    print(3)

def func1():
    print(2)
    time.sleep(1)
    print(4)

g1 = gevent.spawn(func,1)#创建协程对象,g1 = gevent.spawn(func,1,2,3,x=1),可以是位置实参或关键字实参。
g2 = gevent.spawn(func1)
gevent.joinall([g1,g2])
# 1
# 2
# 3
# 4

#4、协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。

#5、协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:
# 协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

#6、需要强调的是:
#1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
#2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)

#7、对比操作系统控制线程的切换,用户在单线程内控制协程的切换,
#优点如下:1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
#         2. 单线程内就可以实现并发的效果,最大限度地利用cpu
#缺点如下:1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
#         2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

#8、server:
from gevent import monkey
monkey.patch_all()
import socket
import gevent

def talk(conn):
    while True:
        ret = conn.recv(1024).decode('utf-8')
        print(ret)
        conn.send(ret.upper().encode('utf-8'))
    conn.close()

sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:                  #每接收一个连接就开启一个协程。
    conn,addr = sk.accept()
    gevent.spawn(talk,conn)

sk.close()

#client:
import socket
from threading import Thread

def talk():
    sk = socket.socket()
    sk.connect(('127.0.0.1', 8080))
    while True:
        sk.send(b'hi')
        ret = sk.recv(1024).decode('utf-8')
        print(ret)
    sk.close()

for i in range(500):
    Thread(target=talk).start()

IO

#阻塞IO:工作效率低
#非阻塞IO:工作效率高,CPU负担大,不建议使用。
#IO多路复用:在有多个对象需要IO阻塞的时候,能够有效的减少阻塞带来的时间损耗,在一定程度上减少CPU的负担。选择这个比较好。
#异步IO:工作效率高,CPU负担小

#IO多路复用:select:选择
#步骤:1、select监视read_lst里的每个对象,当发现client十个线程请求连接,sk会收到信号,每循环一次增加一个conn,
# 十次就是10个conn,这时read_lst列表里面有一个sk和10个conn,sk是不变的。
# 2、接着client每一次发消息过来,conn收到信号,而select每一次都会选择把10个conn中的一个或者多个放在rl里面,不会放sk,
# 接着就执行else下面的代码。
#server:
import socket
import select
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()

read_lst = [sk]
count = 1
while True:
rl,wl,xl = select.select(read_lst,[],[])#select阻塞,rl读列表,wl写列表,xl修改列表。
print(count,rl)
count += 1
for item in rl:
if item == sk:
conn,addr = item.accept()
print(conn)
read_lst.append(conn)
print(read_lst)
else:
ret = item.recv(1024).decode('utf-8')
if not ret: #如果ret是空的
item.close() #关闭conn
read_lst.remove(item) #列表删除这个conn,避免下次重复被使用
else: #ret不是空的
print(ret)
item.send('recv succeed'.encode('utf-8'))


#client:
import socket
from threading import Thread
import time
def talk(args):
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
for i in range(10):
time.sleep(2)
sk.send(('线程%s,第%s次对话'%(args,i)).encode('utf-8'))
ret = sk.recv(1024).decode('utf-8')
print(ret)
sk.close()
for i in range(10):
Thread(target=talk,args=(i,)).start()

 
原文地址:https://www.cnblogs.com/python-daxiong/p/12142818.html