进程 线程 协程

进程

进程 :
什么是进程?
是操作系统的发展过程中,为了提高CPU的利用率,在操作系统同时运行多个程序的时候,为了数据的安全代码不混乱而被创造出来的一个概念
每一个程序运行起来都至少是一个进程.
进程是计算机中最小的资源分配单位
进程被操作系统调度的,有很多相关的算法 - 早期的操作系统
进程之间是数据隔离的
进程的三状态 就绪 运行 阻塞
同步异步
    同步 : 一个任务的执行依赖另一个事务的结束
            join lock
    异步 : 一个任务的执行不依赖另一个事务的结束
            start terminate
阻塞非阻塞
    阻塞   : accept recv recvfrom queue.get join
    非阻塞 : setblocking = False
并发并行
    并行是特殊的并发
    并行就是 同一时刻 两个以上的程序同时在cpu上执行
    并发就是 同一时段 两个以上的程序看起来在同时执行
IO概念 : 文件操作 数据库操作 网络传输 用户输入输出
    Input  得到bytes/str
    Output 发送数据/输出数据

因为进程与进程之间本质上是异步且数据隔离
守护进程 : 子进程等待主进程的代码结束就结束了
同步控制
    join
    锁 - 互斥锁 : 多个进程同时对一个数据进行操作的时候 操作同一个文件/数据库/管道/Manager.dict
    信号量
    事件
数据共享 - 数据不安全
    Manager
IPC-进程之间通信
    管道
    队列 - 生产者消费者模型(为了解决数据的生产和处理的效率问题)
    第三方工具(消息队列,消息中间件)
进程池
    解决大量任务 开启多个进程的开销过大的问题
    节省资源,提高并发效率的
    一般开进程数 cpu_count * 1 or 2
进程? 时间片? 阻塞非阻塞,同步异步,进程三状态,并发并行,数据隔离,IO概念//进程池,IPC,锁,事件,信号量 ...
Python并不支持真正意义上的多线程。Python中提供了多线程模块,但如果想通过多线程提高代码的速度,并不推荐使用多线程模块。Python中有一个全局锁Global Interpreter Lock(GIL),全局锁会确保任何时候多个线程中只有一个会被执行。线程的执行速度非常快,会误以为线程是并行执行的,但实际上都是轮流执行。经过GIL处理后,会增加线程执行的开销。
全局锁 GIL(Global interpreter lock) 并不是 Python 的特性,而是在实现 Python 解析器(CPython)时所引入的一个概念。Python有CPython,PyPy,Psyco 等不同的 Python 执行环境,其中 JPython 没有GIL。CPython 是大部分环境下默认的 Python 执行环境,GIL 并不是 Python 的特性,Python 完全可以不依赖于 GIL。
GIL 限制了同一时刻只能有一个线程运行,无法发挥多核 CPU 的优势。GIL 本质是互斥锁,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。在一个 Python 的进程内,不仅有主线程或者由主线程开启的其它线程,还有解释器开启的垃圾回收等解释器级别的线程。进程内,所有数据都是共享的,代码作为一种数据也会被所有线程共享,多个线程先访问到解释器的代码,即拿到执行权限,然后将 target 的代码交给解释器的代码去执行,解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,因此为了保证数据安全需要加锁处理,即 GIL。 
由于GIL 的存在,同一时刻同一进程中只有一个线程被执行。多核 CPU可以并行完成计算,因此多核可以提升计算性能,但 CPU 一旦遇到 I/O 阻塞,仍然需要等待,所以多核CPU对 I/O 密集型任务提升不明显。根据执行任务是计算密集型还是I/O 密集型,不同场景使用不同的方法,对于计算密集型任务,多进程占优势,对于 I/O 密集型任务,多线程占优势
GIL
所有的程序 - 任务
所有的任务 - 进程
进程 ?  进行中的程序  PID进程ID
    进程是计算机中资源分配的最小单位
    进程间数据隔离   要通信的话运用socket
  进程调度:--先来先服务调度算法,短作业优先,时间片轮转法,多级反馈机制

并发 资源有限的情况下,AB程序交替使用cpu目的是提高效率
并行 同一时刻都在执行 多核  (是并发里的一种特殊情况)  更苛刻的条件

同步:  程序顺序执行,多个任务之间串行执行 (洗衣完--做饭完--洗碗)
异步:  多个任务同时运行  (在同一时间内洗衣做饭洗碗)

阻塞: 程序由于不符合某个条件或者等待某个条件满足 而在某一个地方进入等待状态
非阻塞: 程序正常执行
同步阻塞
一件事儿一件事儿的做
中间还要被阻塞
同步非阻塞 : 费力不讨好
一件事儿一件事儿的做
但是不阻塞
异步阻塞
同时进行的
每一件事儿都会遇到阻塞事件
异步非阻塞
几个事情同时进行
每一件事都不阻塞
 
import os,time
print(os.getpid())  #获取当前进程号
print(os.getppid()) #获取当前父进程id

time.sleep(5)
print(os.getpid()) 
查看进程号
import multiprocessing  #是个包

from multiprocessing import Process
import os
def son_process():
    # 这个函数的代码实在子进程执行的
    print('执行我啦',os.getpid(),os.getppid())
    print()

if __name__ == '__main__':
    print('1-->',os.getpid()) #主进程
    p = Process(target=son_process) #实例化
    p.start()  #进程开始
下面是打印结果
#  1--> 6416
#  执行我啦 7388 6416
python 开启一个子进程
import time
from multiprocessing import Process
# 通过并发实现一个有并发效果的socket server

# 1.开启了一个子进程就已经实现了并发: 父进程(主进程)和子进程并发(同时执行)
def son_process():
    print('son start')
    time.sleep(1)
    print('son end')

if __name__ == '__main__':
    p = Process(target=son_process) #创建子进程
    p.start() #通知操作系统开启一个子进程  os响应需求 分配资源 执行进程中的代码
    print('主进程')
并发的主/子进程
import time
from multiprocessing import Process
# 通过并发实现一个有并发效果的socket server

# 1.开启了一个子进程就已经实现了并发: 父进程(主进程)和子进程并发(同时执行)
def son_process():
    print('son start')
    time.sleep(1)
    print('son end')

if __name__ == '__main__':
    p = Process(target=son_process) #创建子进程
    p.start() #通知操作系统开启一个子进程  os响应需求 分配资源 执行进程中的代码

    for i in range(5):
        print('主进程')
        time.sleep(0.3)
时间测试 主/子进程
import time
from multiprocessing import Process
def son_process():
    print('son start')
    time.sleep(1)
    print('son end')

if __name__ == '__main__':
    for i in range(3):
        p = Process(target=son_process)
        p.start()
开启多个子进程
import time
from multiprocessing import Process
def son_process(i):
    print('son start',i)
    time.sleep(1)
    print('son end',i)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=son_process,args=(i,)) #args必须元组
        p.start()  # 通知操作系统 start并不意味着子进程已经开始了
子进程中传参数
import time
from multiprocessing import Process
def son_process(i):
    print('son start',i)
    time.sleep(1)
    print('son end',i)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=son_process,args=(i,))  #子进程不支持返回值
        p.start()
    print('主进程的代码执行完毕')
# 主进程会等待子进程结束之后才结束
# 为什么?
# 父进程负责创建子进程,也负责回收子进程的资源
主进程和子进程之间的关系
 - server.py-
import socket,time
from multiprocessing import Process
def talk(conn):
    conn, addr = sk.accept()
    print(conn)
    while True:
        msg = conn.recv(1024).decode()
        time.sleep(10)
        conn.send(msg.upper().encode())

if __name__ == '__main__':
    # 这句话下面的所有代码都只在主进程中执行
    sk = socket.socket()
    sk.bind(('127.0.0.1',9000))
    sk.listen()
    while True:
        conn,addr = sk.accept()
        Process(target=talk,args=(sk,)).start()

# 卡 大量的while True 并且代码中并没有太多的其他操作
# 如果我们使用socketserver,不会这么卡
# 多进程确实可以帮助我们实现并发效果,但是还不够完美
# 操作系统没开启一个进程要消耗大量的资源
# 操作系统要负责调度进程 进程越多 调度起来就越吃力

 - clitent.py-
import socket

sk = socket.socket()

sk.connect(('127.0.0.1',9000))
while True:
    sk.send(b'hello')
    print(sk.recv(1024))
多进程实现socketserver
def son_process(i):
    while True:
        print('son start',i)
        time.sleep(0.5)
        print('son end',i)

if __name__ == '__main__':
    p = Process(target=son_process, args=(1,))
    p.start()           # 开启一个子进程,异步的
    print('主进程的代码执行完毕')
    print(p.is_alive())  # 子进程还活着
    p.terminate()        # 结束一个子进程,异步的
    print(p.is_alive())  # 子进程还在活着
    time.sleep(0.1)
    print(p.is_alive()) # False
主进程可不可以直接结束一个子进程?
n = [100]
def sub_n():  # 减法
    global n  # 子进程对于主进程中的全局变量的修改是不生效的
    n.append(1)
    print('子进程n : ',n)   #子进程n :  [100, 1]
if __name__ == '__main__':
    p = Process(target = sub_n)
    p.start()
    p.join()     # 阻塞 直到子进程p结束
    print('主进程n : ',n)  #主进程n :  [100]
数据隔离
# 主进程里的print('主进程n : ',n)这句话在十个子进程执行完毕之后才执行
n = [100]
import random
def sub_n():
    global n  # 子进程对于主进程中的全局变量的修改是不生效的
    time.sleep(random.random())
    n.append(1)
    print('子进程n : ',n)
if __name__ == '__main__':
    p_lst = []  #进程添加进列表
    for i in range(10):
        p = Process(target = sub_n) #创建
        p.start() #通知os 开启
        p_lst.append(p)
    for p in p_lst:p.join()  # 阻塞 只有一个条件是能够让我继续执行 这个条件就是子进程结束
    print('主进程n : ',n)
开启十个进程执行subn
n = [100]
def sub_n():
    global n  # 子进程对于主进程中的全局变量的修改是不生效的
    n.append(1)
    print('子进程n : ',n)
    time.sleep(10)
    print('子进程结束')

if __name__ == '__main__':
    p = Process(target = sub_n)
    p.start()
    p.join(timeout = 5)     # 如果不设置超时时间 join会阻塞直到子进程p结束
#     # timeout超时
#     # 如果设置的超时时间,那么意味着如果不足5s子进程结束了,程序结束阻塞
#     # 如果超过5s还没有结束,那么也结束阻塞
    print('主进程n : ',n)
    p.terminate()  # 也可以强制结束一个子进程
join拓展 超时时间
# 设置子进程为守护进程,守护进程会随着主进程代码的结束而结束
# 由于主进程要负责给所有的子进程收尸,所以主进程必须是最后结束,守护进程只能在主进程的代码结束之后就认为主进程结束了
# 守护进程在主进程的代码结束之后就结束了,不会等待其他子进程结束
#
# 希望守护进程必须等待所有的子进程结束之后才结束
# ????
# import time
# from multiprocessing import Process
# def alive():
#     while True:
#         print('连接监控程序,并且发送报活信息')
#         time.sleep(0.6)
#
# def func():
#     '主进程中的核心代码'
#     while True:
#         print('选择的项目')
#         time.sleep(1)
#         print('根据用户的选择做一些事儿')
#
# if __name__ == '__main__':
#     p = Process(target=alive)
#     p.daemon = True   # 设置子进程为守护进程,守护进程会随着主进程代码的结束而结束
#     p.start()
#     p = Process(target=func)
#     p.start()
#     p.join()   # 在主进程中等待子进程结束,守护进程就可以帮助守护其他子进程了


# 守护进程
# 1.守护进程会等待主进程的代码结束而结束,不会等待其他子进程的结束
# 2.要想守护进程等待其他子进程,只需要在主进程中加上join
守护进程
for i in range(5):
    pass
print(i)  # i=4

lst = []
for i in range(5):
    p = Process()
    lst.append(p)
    p.start()
for p in lst:
    p.join()
    p.terminate()
操作多个子进程的结束terminate和join阻塞
import os
from multiprocessing import Process
class MyProcess(Process): #继承
    def __init__(self,参数):
        super().__init__() #父类 初始化
        self.一个属性 = 参数 

    def run(self):
        print('子进程中要执行的代码')

if __name__ == '__main__':
    conn = '一个链接'
    mp = MyProcess(conn)
    mp.start()
面向对象 开启子进程的方法
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。
接下来,我们以  模拟抢票  为例,来看看数据安全的重要性。
import json
import time
from multiprocessing import Process,Lock

def search(name):
    '''查询余票的功能'''
    with open('ticket') as f:  # 'r'
        dic = json.load(f)  # 读取  dict
        print(name , dic['count'])

def buy(name): # 买票
    with open('ticket') as f:
        dic = json.load(f)
    time.sleep(0.1)
    if dic['count'] > 0:
        print(name,'买到票了')
        dic['count'] -= 1
    time.sleep(0.1)
    with open('ticket','w') as f:
        json.dump(dic,f)  # 写进去

def get_ticket(name,lock): #整个操作
    search(name) # 先查
    lock.acquire()  # 只有第一个到达的进程才能获取锁,剩下的其他人都需要在这里阻塞  上锁
    buy(name)  # 再买
    lock.release()  # 有一个人还锁,会有一个人再结束阻塞拿到钥匙  还锁

if __name__ == '__main__':
    lock = Lock()  # 实例化锁
    for i in range(10):  # 10个进程
        p = Process(target=get_ticket,args=('name%s'%i,lock)) # 创建
        p.start() # 通知os 开启

# tips : ticket 里面的数据结构 {"count": 0}

    # 模拟过程描述:
# 第一个来的人 取钥匙 开门 进门 关门 带着钥匙反锁
# 第一个拿到钥匙的人  开门 出门 锁门 挂钥匙
锁 multiprocess.Lock 模拟抢票 例子
进程 状态码 Z/z 僵尸进程 linux命令
主进程中控制子进程的方法:
    p = Process(target,args) #创建这一刻 根本没有通知操作系统
    p.start() #通知os 开启子进程  异步非阻塞
    p.terminate() #通知os,关闭子进程,异步非阻塞
    p.is_alive() # 查看子进程是否还活着
    p.join(timeout=10) # 阻塞 直到子进程结束  超时时间理解
    
# 守护进程
    # 守护进程是一个子进程
    # 守护进程会在主进程代码结束之后才结束
    # 为什么会这样?
        # 由于主进程必须要回收所有的子进程的资源
        # 所以主进程必须在子进程结束之后才能结束
        # 而守护进程就是为了守护主进程存在的
        # 不能守护到主进程结束,就只能退而求其次,守护到代码结束了
    # 守护到主进程的代码结束,意味着如果有其他子进程没有结束,守护进程无法继续守护
    # 解决方案 : 在主进程中加入对其他子进程的join操作,来保证守护进程可以守护所有主进程和子进程的执行
    # 如何设置守护进程
        # 子进程对象.daemon = True 这句话写在start之前

#
    # 为什么要用锁?
    # 由于多个进程的并发,导致很多数据的操作都在同时进行
    # 所以就有可能产生多个进程同时操作 : 文件数据库 中的数据
    # 导致数据不安全
    # 所以给某一段修改数据的程序加上锁,就可以控制这段代码永远不会被多个进程同时执行
    # 保证了数据的安全
# Lock 锁(互斥锁)
# 锁实际上是把你的某一段程序变成同步的了,降低了程序运行的速度,为了保证数据的安全性
# 没有数据安全的效率都是耍流氓
进程 总结
# 对于锁 保证一段代码同一时刻只能有一个进程执行
# 对于信号量 保证一段代码同一时刻只能有n个进程执行
# 流量控制

import time
import random
from multiprocessing import Process,Semaphore
def ktv(name,sem):
    sem.acquire() #拿锁
    print("%s走进了ktv"%name)
    time.sleep(random.randint(5,10))
    print("%s走出了ktv" % name)
    sem.release() #还锁

if __name__ == '__main__':
    sem = Semaphore(4) # 同时只能有4个进程执行
    for i in range(25):
        p = Process(target=ktv,args = ('name%s'%i,sem))
        p.start()

# 信号量原理 : 锁 + 计数器实现的
# 普通的锁 acquire 1次
# 信号量   acquire 多次
# count计数
# count = 4
# acquire count -= 1
# 当count减到0的时候 就阻塞
# release count + = 1
# 只要count不为0,你就可以继续acquire
信号量 Semaphore
# from multiprocessing import Event
# Event 事件类
# e = Event()
# e 事件对象
# 事件本身就带着标识 : False
# wait 阻塞
# 它的阻塞条件是 对象标识为False
# 结束阻塞条件是 对象标识为True

# 对象的标识相关的 :
# set  将对象的标识设置为True
# clear 将对象的标识设置为False
# is_set 查看对象的标识是否为True

import time
import random
from multiprocessing import Event,Process
def traffic_light(e):
    print('33[1;31m红灯亮33[0m')
    while True:
        time.sleep(2)
        if e.is_set():   # 如果当前是绿灯
            print('33[1;31m红灯亮33[0m') # 先打印红灯亮
            e.clear()                        # 再把灯改成红色
        else :           # 当前是红灯
            print('33[1;32m绿灯亮33[0m') # 先打印绿灯亮
            e.set()                          # 再把灯变绿色
#
def car(e,carname):
    if not e.is_set():  # False
        print('%s正在等待通过'%carname)
        e.wait()  #阻塞
    print('%s正在通过'%carname)

if __name__ == '__main__':
    e = Event()  #创建
    p = Process(target=traffic_light,args = (e,)) #创建进程
    p.start() #开始
    for i in range(100):  #100辆车
        time.sleep(random.randrange(0,3)) #随机
        p = Process(target=car, args=(e,'car%s'%i))
        p.start()

# 太复杂了
# 在我们进行并发操作的时候很少用到这么复杂的场景

# Event事件
# 放到进程中的代码一定不止一段
# 这两个操作之间 存在同步关系
# 一个操作去确认另一个操作的执行条件是否完成

# 标识 控制wait是否阻塞的关键
# 如何修改这个标识 : clear set
# 如何查看这个标识 : is_set
事件Event
# 管道数据不安全 管道加锁就是队列 
from multiprocessing import Pipe,Process

def f(conn):  #接收子conn
    conn.send('hello world') #发消息
    conn.close()

if __name__ == '__main__':
    parent_conn,child_conn = Pipe()
    p = Process(target=f,args=(child_conn,)) #传子conn
    p.start()
    print(parent_conn.recv()) #父conn接收
    p.join()
Pipe 管道 IPC数据间通信 是队列的底层
# 进程之间 数据隔离
# 凭什么判断 子进程是否执行完毕了????
# lock对象
# a进程 acquire了 b进程在acquire的地方一直阻塞直到a release
# 你在b进程 如何知道a进程release了?
# 你之前学习的lock semaphore event实际上都用到了进程之间的通信
# 只不过这些通信都是非常简单而固定的信号
# 在你使用这些工具的过程中并感知不到

# 对于用户来讲 : 就希望能够去进行一些更加复杂的 不固定的内容的交互
# 这种情况下使用lock semaphore event就不可行了

# 进程间通信 IPC
# IPC Inter-Process Communication
# 实现进程之间通信的两种机制:
    # 管道 Pipe  数据不安全
    # 队列 Queue  管道+锁

# from multiprocessing import Queue,Process
#
# def consumer(q):
#     print(
#        '子进程 :', q.get()
#     )
#
#
# if __name__ == '__main__':
#     q = Queue()
#     p = Process(target=consumer,args=(q,))
#     p.start()
#     q.put('hello,world')

# 生产者消费者模型
import time
from multiprocessing import Queue,Process

def producer(name,food,num,q):
    '''生产者'''
    for i in range(num):
        time.sleep(0.3)
        foodi = food + str(i)
        print('%s生产了%s'%(name,foodi))
        q.put(foodi)

def consumer(name,q):
    while True:
        food = q.get()   # 等待接收数据
        if food == None:break
        print('%s吃了%s'%(name,food))
        time.sleep(1)

if __name__ == '__main__':
    q = Queue(maxsize=10)
    p1 = Process(target=producer,args = ('宝元','泔水',20,q))
    p2 = Process(target=producer,args = ('战山','鱼刺',10,q))
    c1 = Process(target=consumer, args=('alex', q))
    c2 = Process(target=consumer, args=('wusir', q))
    p1.start()   # 开始生产
    p2.start()   # 开始生产
    c1.start()
    c2.start()
    p1.join()    # 生产者结束生产了
    p2.join()    # 生产者结束生产了
    q.put(None)  # put None 操作永远放在所有的生产者结束生产之后
    q.put(None)  # 有几个消费者 就put多少个None

# 为什么队列为空 为满 这件事情不够准确
# q.qsize()  队列的大小
# q.full()   是否满了 满返回True
# q.empty()  是否空了 空返回True
进程间通信 IPC
import  time
from multiprocessing import JoinableQueue,Process

def consumer(name,q):
    while True:
        food = q.get()
        time.sleep(1)
        print('%s消费了%s'%(name,food))
        q.task_done()

def producer(name,food,num,q):
    '''生产者'''
    for i in range(num):
        time.sleep(0.3)
        foodi = food + str(i)
        print('%s生产了%s'%(name,foodi))
        q.put(foodi)
    q.join()   # 消费者消费完毕之后会结束阻塞
if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=('宝元', '泔水', 20, q))
    c1 = Process(target=consumer, args=('alex', q))
    c2 = Process(target=consumer, args=('wusir', q))
    c1.daemon = True
    c2.daemon = True
    p1.start()
    c1.start()
    c2.start()
    p1.join()

# 消费者每消费一个数据会给队列发送一条信息
# 当每一个数据都被消费掉之后 joinablequeue的join阻塞行为就会结束
# 以上就是为什么我们要在生产完所有数据的时候发起一个q.join()

# 随着生产者子进程的执行完毕,说明消费者的数据都消费完毕了
# 这个时候主进程中的p1.join结束
# 主进程的代码结束
# 守护进程也结束了
JoinableQueue 消费者模型
from multiprocessing import Manager,Process,Lock
def work(d,lock):
    # with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
    #     d['count']-=1
    ''' 等价于下面的代码 '''
    lock.acquire()
    d['count'] -= 1
    lock.release()

if __name__ == '__main__':
    lock=Lock()
    m = Manager()
    dic=m.dict({'count':100})
    p_l=[]
    for i in range(100):
        p=Process(target=work,args=(dic,lock))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()
    print(dic)

# Manager是一个类 内部有一些数据类型能够实现进程之间的数据共享
# dict list这样的数据 内部的数字进行自加 自减 是会引起数据不安全的,这种情况下 需要我们手动加锁完成
# 因此 我们一般情况下 不适用这种方式来进行进程之间的通信
# 我们宁可使用Queue队列或者其他消息中间件 来实现消息的传递 保证数据的安全
进程之间的数据共享 Manager类
import time
from multiprocessing import Process

def func(i):
    i -= 1

if __name__ == '__main__':
    start = time.time()  #计时开始
    l = []
    for i in range(100):
        p = Process(target=func,args=(i,))
        p.start()
        l.append(p)
    for p in l:
        p.join()
    print(time.time() - start) #计时结束
100个进程 计算时间
计算时间差
import time
from multiprocessing import Pool  #
def func(i):
    i -= 1

if __name__ == '__main__':
    start = time.time()
    p = Pool(5)  #池  你的池中打算放多少个进程,个数cpu的个数 * 1/2
    p.map(func,range(100))  # 自动带join
    print(time.time()-start)
Pool 进程池
from multiprocessing import Pool
def f(i):
    i -= 1
    return i**2

if __name__ == '__main__':
    p = Pool(5)  #池  你的池中打算放多少个进程,个数cpu的个数 * 1/2
    ret = p.map(f,range(100))  # 自动带join
    print(ret)
Pool 获取程序执行的返回值
import time
from multiprocessing import Pool  #
def func(i):
    i -= 1
    time.sleep(0.5)
    return i**2

if __name__ == '__main__':
    p = Pool(5) #你的池中打算放多少个进程,个数cpu的个数 * 1|2
    for i in range(100):
        ret = p.apply(func,args=(i,))  # 自动带join 串行/同步 apply就是同步提交任务
        print(ret)
apply 同步方式向进程池提交任务
import time
from multiprocessing import Pool  #
def func(i):
    i -= 1
    time.sleep(0.3)
    # print(i)
    return i**2
if __name__ == '__main__':
    p = Pool(5)
    lst = []
    for i in range(100):
        ret = p.apply_async(func,args=(i,))  # 自动带join 异步的 apply_async异步提交任务
        lst.append(ret)
    # p.close()   # 关闭进程池的任务提交 从此之后不能再向p这个池提交新的任务
    # p.join()    # 阻塞 一直到所有的任务都执行完
    # print('结束')
    for i in lst :
        print(i.get())
apply_async 异步方式向进程池提交任务
什么是进程池? 有限的进程的池子
为什么要用进程池?
    任务很多 cpu个数*5个任务以上
    为了节省创建和销毁进程的时间 和 操作系统的资源
一般进程池中进程的个数:
    cpu的1-2倍
    如果是高计算,完全没有io,那么就用cpu的个数
    随着IO操作越多,可能池中的进程个数也可以相应增加
向进程池中提交任务的三种方式
    map    异步提交任务 简便算法 接收的参数必须是 子进程要执行的func,可迭代的(可迭代中的每一项都会作为参数被传递给子进程)
        能够传递的参数是有限的,所以比起apply_async限制性比较强
    apply  同步提交任务(你删了吧)
    apply_async 异步提交任务
        能够传递比map更丰富的参数,但是比较麻烦
        首先 apply_async提交的任务和主进程完全异步
        可以通过先close进程池,再join进程池的方式,强制主进程等待进程池中任务的完成
        也可以通过get获取返回值的方式,来等待任务的返回值
            我们不能在apply_async提交任务之后直接get获取返回值
               for i in range(100):
                    ret = p.apply_async(func,args=(i,))  # 自动带join 异步的 apply_async异步提交任务
                    l.append(ret)
                for ret in l:
                    print(ret.get())
进程池总结
回调函数
import os
import time
import random
from multiprocessing import Pool  #
def func(i):     # [2,1,1.5,0,0.2]
    i -= 1
    time.sleep(random.uniform(0,2))
    return i**2

def back_func(args):
    print(args,os.getpid())

if __name__ == '__main__':
    print(os.getpid())
    p = Pool(5)
    l = []
    for i in range(100):
        ret = p.apply_async(func,args=(i,),callback=back_func)  # 5个任务
    p.close()
    p.join()
callback回调函数
主动执行func,然后在func执行完毕之后的返回值,直接传递给back_func作为参数,调用back_func
处理池中任务的返回值
回调函数是由谁执行的? 主进程
回调函数 进程池 回调函数没有返回值
import re
import json
from urllib.request import urlopen  #请求页面包
from multiprocessing import Pool

def get_page(i):  #页面
    ret = urlopen('https://movie.douban.com/top250?start=%s&filter='%i).read()
    ret = ret.decode('utf-8')
    return ret

def parser_page(s): #页面数据分析
    com = re.compile(
        '<div class="item">.*?<div class="pic">.*?<em .*?>(?P<id>d+).*?<span class="title">(?P<title>.*?)</span>'
        '.*?<span class="rating_num" .*?>(?P<rating_num>.*?)</span>.*?<span>(?P<comment_num>.*?)评价</span>', re.S)

    ret = com.finditer(s)
    with open('file','a',encoding='utf-8') as f:
        for i in ret:
            dic = {
                "id": i.group("id"),
                "title": i.group("title"),
                "rating_num": i.group("rating_num"),
                "comment_num": i.group("comment_num"),
            }
            f.write(json.dumps(dic,ensure_ascii=False)+'
')

if __name__ == '__main__':
    p = Pool(5)
    count = 0
    for i in range(10):
        p.apply_async(get_page,args=(count,),callback=parser_page)
        count += 25
    p.close()
    p.join()
进程池 爬虫 豆瓣 例子 处理中文
import json
with open('file2','w',encoding='utf-8') as f:
    json.dump({'你好':'alex'},f,ensure_ascii=False)
ensure_ascii=False

线程 - 

线程
    轻量级 进程  解决并发 整体效率高于进程
    在进程中数据共享  资源共享
    是进程的一部分,不能独立存在的 
    被CPU调度的最小单位
使用场景 socketserver web的框架 django flask tornado 多线程来接收用户并发的请求
python里  同一个进程中的多个线程能不能同时使用多个cpu

在整个程序界:
    如果你的程序需要数据隔离 : 多进程
    如果你的程序对并发的要求非常高 : 多线程

python
初期 面向单核的 一个cpu
作为一门脚本语言  解释型语言

线程锁这件事儿是由Cpython解释器完成
对于python来说 同一时刻只能有一个线程被cpu访问
彻底的解决了多核环境下的安全问题
线程锁 : 全局解释器锁 GIL
    1.这个锁是锁线程的
    2.这个锁是解释器提供的

多线程仍然有它的优势
    你的程序中用到cpu真的多么
    如果100% 90%的时间都消耗在计算上,那么cpython解释器下的多线程对你来说确实没用
    但是你写的大部分程序 的时间实际上都消耗在了IO操作上
遇到高计算型
    开进程 4个进程  每个进程里开n个线程
    换个解释器
线程概念
import os
import time
from threading import Thread
def func():
    print('start',os.getpid())
    time.sleep(1)
    print('end')

if __name__ == '__main__':
    t = Thread(target=func)
    t.start()
    for i in range(5):
        print('主线程',os.getpid())
        time.sleep(0.3)
Thread 线程类 初识
import time
from threading import Thread
def func():
    n = 1 + 2 + 3
    n ** 2

if __name__ == '__main__':
    start = time.time()
    lst = []
    for i in range(100):
        t = Thread(target=func)
        t.start()
        lst.append(t)
    for t in lst:
        t.join()
    print(time.time() - start)
#

import time
from multiprocessing import Process as Thread
def func():
    n = 1 + 2 + 3
    n**2

if __name__ == '__main__':
    start = time.time()
    lst = []
    for i in range(100):
        t = Thread(target=func)
        t.start()
        lst.append(t)
    for t in lst:
        t.join()
    print(time.time() - start)
进程与线程的效率对比
from threading import Thread
n = 100
def func():
    global n
    n -= 1

t = Thread(target=func)
t.start()
t.join()
print(n)
线程的数据共享
from threading import Thread
class Mythread(Thread):
    def __init__(self,arg):
        super().__init__()
        self.arg = arg
    def run(self):
        print('in son',self.arg)

t = Mythread(123)
t.start()
开启线程的另一种方式 传参数
import time
from threading import Thread,currentThread,activeCount,enumerate
class Mythread(Thread):
    def __init__(self,arg):
        super().__init__()
        self.arg = arg
    def run(self):
        time.sleep(1)
        print('in son',self.arg,currentThread())
# t = Mythread(123)
# t.start()
# print('主',currentThread()) #当前线程
#
for i in range(10):
    t = Mythread(123)
    t.start()
    # print(t.ident) #当前线程id
print(activeCount())  #几个活跃的线程  11
print(enumerate()) # 一共几个线程 []
threading模块中的其他功能
- server.py - 
import socket
from threading import Thread

def talk(conn):
    while True:
        msg = conn.recv(1024).decode()  #解码
        conn.send(msg.upper().encode())  #编码

sk = socket.socket() #创建
sk.bind(('127.0.0.1',9000)) #bind
sk.listen() #监听
while True:
    conn,addr = sk.accept() #接收
    Thread(target=talk,args = (conn,)).start() #创建线程并开启

- client.py -
import socket
sk = socket.socket() #创建

sk.connect(('127.0.0.1',9000)) #连接
while True:
    sk.send(b'hello') #
    print(sk.recv(1024)) #
多线程实现socketserver
import time
from threading import Thread

def func():
    while True:
        print('in func')
        time.sleep(0.5)

def func2():
    print('start func2')
    time.sleep(10)
    print('end func2')

Thread(target=func2).start()
t = Thread(target=func)
t.setDaemon(True)  #线程 守护操作
t.start()
print('主线程')
time.sleep(2)
print('主线程结束')

# 守护进程 只守护主进程的代码,主进程代码结束了就结束守护,守护进程在主进程之前结束
# 守护线程 随着主线程的结束才结束,守护线程是怎么结束的  直到主子线程都结束 进程结束

# 进程 terminate 强制结束一个进程的
# 线程 没有强制结束的方法
# 线程结束 : 线程内部的代码执行完毕 那么就自动结束了
守护线程 线程结束问题
import time
from threading import Thread,currentThread
def func():
    print(currentThread())

print('开始')
for i in range(10):
    Thread(target=func).start()
    # time.sleep(2)
print('主线程')
currentThread
锁 用来保证数据安全
有了GIL还是会出现数据不安全的现象,所以还是要用锁 
import time
from threading import Thread,Lock
n = 100
def func(lock): #
    global n  #用 全局的n
    # n -= 1
    with lock:
        tmp = n-1  # n-=1
        # time.sleep(0.1)
        n = tmp

if __name__ == '__main__':
    l = []
    lock = Lock() #实例化
    for i in range(100):
        t = Thread(target=func,args=(lock,)) #创建
        t.start() #开启
        l.append(t)
    for t in l:
        t.join()
    print(n)
锁 from threading import Thread,Lock
import dis
n = 1  #全局空间的 += -= 操作 都不是数据安全的
def func():
    n = 100  #局部空间内 永远安全
    n -= 1

dis.dis(func)

# 会出现线程不安全的两个条件
# 1.是全局变量
# 2.出现 += -=这样的操作

#下面是解析
LOAD  仅有这个 数据安全
STORE 有load store 就会不安全

# 列表 字典
# 方法 l.append l.pop l.insert dic.update 都是线程安全的
# l[0] += 1  不安全
# d[k] += 1  不安全
dis 模块 cpu计算底层步骤
# 科学家吃面问题
import time
from threading import Thread,Lock
# noodle_lock = Lock()
# fork_lock = Lock()
# 死锁不是时刻发生的,有偶然的情况整个程序都崩了
# 每一个线程之中不止一把锁,并且套着使用
# 如果某一件事情需要两个资源同时出现,那么不应该将这两个资源通过两把锁控制
# 而应看做一个资源
#
# def eat1(name):
#     noodle_lock.acquire()
#     print('%s拿到面条了'%name)
#     fork_lock.acquire()
#     print('%s拿到叉子了'%name)
#     print('%s开始吃面'%name)
#     time.sleep(0.2)
#     fork_lock.release()
#     print('%s放下叉子了' % name)
#     noodle_lock.release()
#     print('%s放下面了' % name)
#
# def eat2(name):
#     fork_lock.acquire()
#     print('%s拿到叉子了' % name)
#     noodle_lock.acquire()
#     print('%s拿到面条了' % name)
#     print('%s开始吃面' % name)
#     time.sleep(0.2)
#     noodle_lock.release()
#     print('%s放下面了' % name)
#     fork_lock.release()
#     print('%s放下叉子了' % name)
#
# Thread(target=eat1,args=('wei',)).start()
# Thread(target=eat2,args=('hao',)).start()
# Thread(target=eat1,args=('太',)).start()
# Thread(target=eat2,args=('宝',)).start()

lock = Lock()
def eat1(name):
    lock.acquire()
    print('%s拿到面条了'%name)
    print('%s拿到叉子了'%name)
    print('%s开始吃面'%name)
    time.sleep(0.2)
    lock.release()
    print('%s放下叉子了' % name)
    print('%s放下面了' % name)

def eat2(name):
    lock.acquire()
    print('%s拿到叉子了' % name)
    print('%s拿到面条了' % name)
    print('%s开始吃面' % name)
    time.sleep(0.2)
    lock.release()
    print('%s放下面了' % name)
    print('%s放下叉子了' % name)

Thread(target=eat1,args=('alex',)).start()
Thread(target=eat2,args=('wusir',)).start()
Thread(target=eat1,args=('太白',)).start()
Thread(target=eat2,args=('宝元',)).start()

# 先临时解决 fork_lock=noodle_lock = Lock()
# 然后再找到死锁的原因,再去修改  终极办法一把锁
科学家吃面问题 死锁现象
from threading import RLock,Lock,Thread

# 互斥锁
#     无论在相同的线程还是不同的线程,都只能连续acquire一次
#     要想再acquire,必须先release
# 递归锁
#     在同一个线程中,可以无限次的acquire
#     但是要想在其他线程中也acquire,
#     必须现在自己的线程中添加和acquire次数相同的release
rlock = RLock()  #每一次acquire都像进去一道门
rlock.acquire()
rlock.acquire()
rlock.acquire()
rlock.acquire()  #直到全都release 才能下个人进门
print('锁不住')

lock = Lock() #普通锁/互斥锁
lock.acquire()
print('1')  #到这里 hang住了 
lock.acquire()
print('2')
递归锁 RLock 互斥锁Lock
from threading import RLock
rlock = RLock()
def func(num):
    rlock.acquire() #
    print('aaaa',num)
    rlock.acquire()
    print('bbbb',num)
    rlock.release() #必须  还锁
    rlock.release() #必须

Thread(target=func,args=(1,)).start()
Thread(target=func,args=(2,)).start()
# aaaa 1
# bbbb 1
# aaaa 2
# bbbb 2
递归锁 1
import time
from threading import RLock,Lock,Thread
noodle_lock = fork_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s拿到面条了'%name)
    fork_lock.acquire()
    print('%s拿到叉子了'%name)
    print('%s开始吃面'%name)
    time.sleep(0.2)
    fork_lock.release()
    print('%s放下叉子了' % name)
    noodle_lock.release()
    print('%s放下面了' % name)

def eat2(name):
    fork_lock.acquire()
    print('%s拿到叉子了' % name)
    noodle_lock.acquire()
    print('%s拿到面条了' % name)
    print('%s开始吃面' % name)
    time.sleep(0.2)
    noodle_lock.release()
    print('%s放下面了' % name)
    fork_lock.release()
    print('%s放下叉子了' % name)

Thread(target=eat1,args=('alex',)).start()
Thread(target=eat2,args=('wusir',)).start()
Thread(target=eat1,args=('太白',)).start()
Thread(target=eat2,args=('宝元',)).start()
递归锁 科学家吃面 会多用占用资源   建议互斥锁
import time
from threading import Semaphore,Thread

def func(name,sem):
    sem.acquire()
    print(name,'start')
    time.sleep(1)
    print(name,'stop')
    sem.release()

sem = Semaphore(5)
for i in range(20):
    Thread(target=func,args=(i,sem)).start()
信号量 线程 不太好 比池
from threading import Event
# 事件
# wait() 阻塞 到事件内部标识为True就停止阻塞
# 控制标识
    # set
    # clear
    # is_set

# 连接数据库
import time
import random
from threading import Thread,Event
def connect_sql(e):
    count = 0
    while count < 3:
        e.wait(0.5)
        if e.is_set():
            print('连接数据库成功')
            break
        else:
            print('数据库未连接成功')
            count += 1

def test(e):
    time.sleep(random.randint(0,3))
    e.set()

e = Event()
Thread(target=test,args=(e,)).start() #测试
Thread(target=connect_sql,args=(e,)).start() #连接
事件Event 2个事情,一个事情要确认另一个事情才能开始做
from threading import Timer

def func():
    print('执行我啦')

t = Timer(3,func)
# 现在这个时间点我不想让它执行,而是预估一下大概多久之后它执行比较合适
t.start()
print('主线程的逻辑')
t.join()
print('ok ')
定时器
# wait      阻塞
# notify(n) 给信号

# 假如现在有20个线程
# 所有的线程都在wait这里阻塞
# notify(n) n传了多少
# 那么wait这边就能获得多少个解除阻塞的通知

# notifyall
# acquire
# release

import threading

def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" % n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition() #条件
    for i in range(10): #10个线程
        t = threading.Thread(target=run, args=(i,))
        t.start() #开启

    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))

        con.release()
        print('****')

# 设置某个条件
# 如果满足这个条件 就可以释放线程
# 监控测试我的网速
# 20000个任务
# 测试我的网速 /系统资源
# 发现系统资源有空闲,我就放行一部分任务
条件 condition
import queue
#  线程队列 线程之间数据安全
q = queue.Queue()
# # 普通队列
# q.put(1)
# # print(q.get())
# try:
#     q.put_nowait(2)
# except queue.Full:
#     print('您丢失了一个数据2')
# print(q.get_nowait()) # 如果有数据我就取,如果没数据不阻塞而是报错


# 非阻塞的情况下
q.put(10)
print(q.get(timeout=2))
#
# # 算法里 栈
# lfq = queue.LifoQueue()   # 栈
# lfq.put(1)
# lfq.put(2)
# lfq.put(3)
# print(lfq.get())
# print(lfq.get())
# print(lfq.get())
#
# # 优先级队列,是根据第一个值的大小来排定优先级的
# # ascii码越小,优先级越高
# q = queue.PriorityQueue()
# q.put((2,'a'))
# q.put((1,'c'))
# q.put((1,'b'))
#
# print(q.get())

# 线程+队列 实现生产者消费者模型
线程队列
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用

import time
import random
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor  #线程池
# from concurrent.futures import ProcessPoolExecutor as Pool #进程池

def func(num):
    print('in %s func'%num,currentThread())
    time.sleep(random.random())
    return num**2

tp = ThreadPoolExecutor(5)  #5个线程
ret_l = []
for i in range(30):
    ret = tp.submit(func,i) #提交
    ret_l.append(ret)
for ret in ret_l:  #取值
    print(ret.result())
线程池1
import time
import random
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor as Pool
import os
def func(num):
    # print('in %s func'%num,currentThread())
    print('in %s func'%num,os.getpid())
    time.sleep(random.random())
    return num**2
if __name__ == '__main__':

    # tp = ThreadPoolExecutor(5)
    tp = Pool(5)
    ret = tp.map(func,range(30))
    # print(list(ret))
    for i in ret:
        print(i)
线程池 map 简单用法
# 回调函数 add_done_callback
import time
import random
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor as Pool
def func1(num):
    print('in func1 ',num,currentThread())
    return num*'*'

def func2(ret):
    print('--->',ret.result(),currentThread())
tp = Pool(5)
print('主 : ',currentThread())
for i in range(10):
    tp.submit(func1,i).add_done_callback(func2)
# 回调函数收到的参数是需要使用result()获取的
# 回调函数是由谁执行的? 主线程
线程 回调函数
import time
import random
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor as Pool 
from urllib.request import urlopen
def func(name,url):
    content = urlopen(url).read()
    return name,content

def parserpage(ret):
    name,content = ret.result()
    with open(name,'wb') as f:
        f.write(content)

urls = {
    # 'baidu.html':'https://www.baidu.com',
    # 'python.html':'https://www.python.org',
    # 'openstack.html':'https://www.openstack.org',
    'github.html':'https://help.github.com/',
    'sina.html':'http://www.sina.com.cn/'
}

tp = Pool(2)
for k in urls:
    tp.submit(func,k,urls[k]).add_done_callback(parserpage)
多线程爬虫
线程
锁
    为什么有了GIL之后还需要锁
        多个线程同时操作全局变量的时候
        当出现"非原子性操作",例如+= -= *= /=
    l.append(1) 原子性操作
    a += 1  a = a + 1
        tmp = a +1
        a = tmp
死锁现象
    什么是死锁现象?
        两个以上的线程争抢同一把锁,
        其中一个线程获取到锁之后不释放
        另外的其他线程就都被锁住了
        比较容易出现问题的情况 : 两把锁套在一起用了
        死锁现象的本质 :代码逻辑问题
递归锁
    一把锁在同一个线程中acquire多次而不被阻塞
    如果另外的线程想要使用,必须release相同的次数,
    才能释放锁给其他线程
信号量
    控制几个线程同一时刻只能有n个线程执行某一段代码
    锁 + 计数器
事件
    两件事情
    一件事情要想执行依赖于另一个任务的结果
条件
    n个线程在某处阻塞
    由另一个线程控制这n个线程中有多少个线程能继续执行
定时器
    规定某一个线程在开启之后的n秒之后执行
队列栈优先级队列
    import queue
    线程之间数据安全
    多个线程get不可能同时取走一个数据,导致数据的重复获取
    多个线程put也不可能同时存入一个数据,导致数据的丢失
    队列 先进先出
    栈   先进后出
    优先级 优先级高的先出
线程池
    concurrent.futrues
    ThreadPoolExcuter
    ProcessPoolExcuter
submit 异步提交任务
shutdown 等待池内任务完成
result  获取进程函数的返回值
map     异步提交任务的简便用法
add_done_callback 回调函数
    进程 主进程执行
    线程
线程总结

协程 - 

进程 计算机中最小的资源分配单位
线程 计算机中能被CPU调度的最小单位
线程是由操作系统创建的,开启和销毁仍然占用一些时间
调度
    1.一条线程陷入阻塞之后,这一整条线程就不能再做其他事情了
    2.开启和销毁多条线程以及cpu在多条线程之间切换仍然依赖操作系统
你了解协程 ?
    了解
    协程(纤程,轻型线程)
    对于操作系统来说协程是不可见的,不需要操作系统调度
    协程是程序级别的操作单位
协程效率高不高
    和操作系统本身没有关系,和线程也没有关系
    而是看程序的调度是否合理
协程指的只是在同一条线程上能够互相切换的多个任务
遇到io就切换实际上是我们利用协程提高线程工作效率的一种方式
协程 只和程序相关 代码级别的
# 切换 + 状态保存  yield
import time
def consumer(res):
    '''任务1:接收数据,处理数据'''
    pass

def producer():
    '''任务2:生产数据'''
    res=[]
    for i in range(100000):  #1亿次
        res.append(i)
    return res
#
start=time.time()
res=producer()
consumer(res) # 写成consumer(producer())会降低执行效率
stop=time.time()
print(stop-start)


import time
def consumer():
    while True:
        res = yield

def producer():
    g = consumer()
    next(g)
    for i in range(100000):
        g.send(i)
start =time.time()
producer()
print(time.time() - start)

# yield这种切换 就已经在一个线程中出现了多个任务,这多个任务之前的切换 本质上就是协程,consumer是一个协程,producer也是一个协程
# 单纯的切换还会消耗时间
# 但是如果能够在阻塞的时候切换,并且多个程序的阻塞时间共享,协程能够非常大限度的提高效率
原生协程
协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的
# greenlet  协程模块 在多个任务之间来回切换
# gevent 基于greenlet实现的,多个任务交给gevent管理,遇到IO就使用greenlet进行切换

import time
from greenlet import greenlet

def play():
    print('start play')
    g2.switch()  # 开关
    time.sleep(1)
    print('end play')
def sleep():
    print('start sleep')
    time.sleep(1)
    print('end sleep')
    g1.switch()
g1 = greenlet(play)
g2 = greenlet(sleep)
g1.switch()  # 开关
代码 协程
import time
import gevent
def play():   # 协程1
    print(time.time())
    print('start play')
    gevent.sleep(1)
    print('end play')
def sleep():  # 协程2
    print('start sleep')
    print('end sleep')
    print(time.time())

g1 = gevent.spawn(play)
g2 = gevent.spawn(sleep)
# g1.join()
# g2.join()  # 精准的控制协程任务,一定是执行完毕之后join立即结束阻塞
gevent.joinall([g1,g2])
greelet 
from gevent import monkey;monkey.patch_all()
import time
import gevent
url_lst = ['https://www.python.org/','https://www.yahoo.com/','https://github.com/']

def get_page(url):
    ret = urlopen(url).read()
    return ret.decode('utf-8')
start = time.time()
g_l = []
for url in url_lst:
    g = gevent.spawn(get_page,url)
    g_l.append(g)
#
gevent.joinall(g_l)
print(time.time()-start)
协程 爬虫
- server.py -
from gevent import monkey;monkey.patch_all()
import socket
import gevent

def talk(conn):
    while True:
        msg = conn.recv(1024).decode()
        conn.send(msg.upper().encode())

sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()

while True:
    conn,addr = sk.accept()
    gevent.spawn(talk,conn)

- client.py - 
import socket
import threading
def task():
    sk = socket.socket()
    sk.connect(('127.0.0.1',9000))
    while True:
        sk.send(b'hello')
        print(sk.recv(1024))

for i in range(500):
    threading.Thread(target=task).start()
协程 socket 并发 
协程
一条线程在多个任务之间相互切换
数据安全的
不能利用多核
能够规避一个线程上的IO阻塞

一条线程能够起500个协程
4c的机器
5个进程
每一个进程20个线程
每一个线程500个协程
5*20*500 = 50000
协程总结 一条线程上的 规避IO
原文地址:https://www.cnblogs.com/zhangchen-sx/p/11019951.html