并发编程

一、操作系统 

一、多道操作系统
    有多个程序在操作系统中执行
    当一个程序遇到IO操作的时候就把CPU让出来给其他程序用

二、分时操作系统
    时间片
    不是遇到IO的时候才让出CPU,而是时间到了就将CPU让出来
    切换要占时间 :单纯的分时系统没有提高CPU的利用率

三、实时系统
    及时响应。每一个信息接收、分析处理和发送的过程必须在严格的时间限制内完成。
    高可靠性。需采取冗余措施,双机系统前后台工作,也包括必要的保密措施等

四、个人计算机操作系统

五、分布式操作系统
View Code

二、进程

进程
    进程就是运行中的程序,没有运行的程序是一个文件
    pid是一个全系统唯一的对某个进程的标识
    随着进程的重启,pid也可能会变化
    进程是操作系统中最小的资源分配单位

    同一个程序执行两次,就会在操作系统中出现两个进程,
    所以我们可以同时运行一个软件,分别做不同的事情也不会混乱。

进程调度
    先来先服务 FCFS
    短作业优先算法
    时间片轮转法
    多级反馈队列算法

同步阻塞:不能充分利用cpu
异步非阻塞:过度利用cpu
IO多路复用:比较完善的在网络编程中的解决方案
View Code

三、multiprocess模块

import os
import time
from multiprocessing import Process

def func():
    for i in range(10):
        time.sleep(0.5)
        print('子进程:',os.getpid(),os.getppid())

if __name__ == '__main__':
    print('主进程',os.getpid(),os.getppid())
    p = Process(target=func)
    p.start()
    for i in range(10):
        time.sleep(0.3)
        print('*'*i)

# 给子进程传参数
def fun(arg):
    for i in range(10):
        time.sleep(0.5)
        print('子进程:%s'%arg,os.getpid(),os.getppid())


if __name__ == '__main__':
    print('主进程:',os.getpid(),os.getppid())
    p = Process(target=fun,args=(1,))
    p.start()
    for i in range(10):
        time.sleep(0.3)
        print('*'*i)

# 进程之间数据隔离问题
count = 100
def fun():
    global count
    count -= 1
    print('子进程:',count)

if __name__ == '__main__':
    print('主进程',os.getpid(),os.getppid())
    p = Process(target=fun)
    p.start()
    time.sleep(3)
    print('主进程',count)

# 启动多个进程
def fun(arg):
    print('子进程:%s' %arg,os.getpid(),os.getppid())

if __name__ == '__main__':
    for i in range(10):
        Process(target=fun,args=(i,)).start()

# 子进程和父进程之间的关系
def fun(arg):
    print('子进程:%s' %arg,os.getpid(),os.getppid())
    time.sleep(5)
    print('子进程end')

if __name__ == '__main__':
    for i in range(10):
        Process(target=fun,args=(i,)).start()
    print('父进程')

# 1.父进程和子进程的启动是异步的
# 父进程只负责通知操作系统启动子进程
# 接下来的工作由操作系统接手 父进程继续执行
# 2.父进程执行完毕之后并不会直接结束程序,
# 而是会等待所有的子进程都执行完毕之后才结束
# 父进程要负责回收子进程的资源
View Code

四、Process类

import time
import random
import os
from multiprocessing import Process
# join同步控制
def fun(index):
    time.sleep(random.random())
    print('第%s个邮件已经发送完毕' %index)

if __name__ == '__main__':
    p_lst = []
    for i in range(10):
        p = Process(target=fun,args=(i,))
        p.start()
        p_lst.append(p)
    for p in p_lst:
        p.join()
    print('10个邮件已经发送完毕')

# 开启进程的第二种方式
class MyProcess(Process):
    def __init__(self,arg):
        super().__init__()
        self.arg = arg

    def run(self):
        print('子进程',os.getpid(),os.getppid(),self.arg)

if __name__ == '__main__':
    p_lst = []
    for i in range(10):
        p = MyProcess(i)
        p.start()
        p_lst.append(p)
    for p in p_lst:
        p.join()
    print('主进程', os.getpid())
View Code

五、守护进程

import time
from multiprocessing import Process

def func1():
    count = 1
    while True:
        time.sleep(0.5)
        print(count * '*')
        count += 1

def func2():
    print('func2 start')
    time.sleep(5)
    print('func2 end')

if __name__ == '__main__':
    p1 = Process(target=func1)
    p1.daemon = True
    p1.start()
    Process(target=func2).start()
    time.sleep(3)
    print('主进程')
View Code

六、锁

import time
import json
from multiprocessing import Process,Lock

def search(person):
    with open('ticket.txt') as f:
        dic = json.load(f)
    time.sleep(0.2)
    print('%s查询到余票%s' % (person,dic['count']))

def get_ticket(person):
    with open('ticket.txt') as f:
        dic = json.load(f)
    time.sleep(0.2)
    if dic['count'] > 0:
        print('%s买到票了' %person)
        dic['count'] -= 1
        time.sleep(0.2)
        with open('ticket.txt','w') as f:
            json.dump(dic,f)
    else:
        print('%s没买到票' %person)

def ticket(person,lock):
    search(person)
    lock.acquire()
    get_ticket(person)
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target=ticket,args=(i,lock))
        p.start()
View Code

七、信号量

import time
import random
from multiprocessing import Process,Semaphore

def ktv(person,sem):
    sem.acquire()
    print('%s走进ktv' %person)
    time.sleep(random.randint(1,5))
    print('%s走出ktv' %person)
    sem.release()

if __name__ == '__main__':
    sem = Semaphore(4)
    for i in range(10):
        p = Process(target=ktv,args=('person%s' %i,sem))
        p.start()

# 信号量的实现机制:计数器+锁
View Code

八、事件

import time
import random
from multiprocessing import Process,Event

def triffic_light(e):
    print("33[31;1m红灯33[0m")
    while 1:
        if not e.is_set():
            time.sleep(2)
            print("33[32;1m绿灯33[0m")
            e.set()
        else:
            time.sleep(2)
            print("33[31;1m红灯33[0m")
            e.clear()

def car(e,i):
    if not e.is_set(): #True
        print("%s 等待" %i)
        e.wait()
    print("%s 通过" %i)

if __name__ == '__main__':
    e = Event()
    p1 = Process(target=triffic_light, args=(e,))
    p1.daemon = True
    p1.start()
    l = []
    for i in range(10):
        time.sleep(random.randrange(0,2))
        p = Process(target=car,args=(e,"car%s" %i))
        p.start()
        l.append(p)
    for p in l:p.join()
View Code

九、队列

import time
from multiprocessing import Queue,Process

q = Queue(2)
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())

# 查看队列是否已满
print(q.full())
# 查看队列是否为空
print(q.empty())

try:
    q.put_nowait()
except:
    print('队列已经满了')

try:
    print(q.get_nowait())
except:
    print('队列已经空了')


def fun(q):
    print('son--->',q.get())
    q.put('abc')

if __name__ == '__main__':
    q = Queue()
    p = Process(target=fun,args=(q,))
    p.start()
    q.put({'123':123})
    p.join()
    print('Foo--->',q.get())
View Code

十、生产者消费模型

import time
import random
from multiprocessing import Process,Queue

# 消费者
def consumer(q,name):
    while 1:
        food = q.get()
        if food is None:break
        time.sleep(random.uniform(0.5,1))
        print('%s 吃了一个 %s' %(name,food))

# 生产者
def producer(q,name,food):
    for i in range(10):
        time.sleep(random.uniform(0.3,0.8))
        print('%s 生产了 %s%s' %(name,food,i))
        q.put(food+str(i))

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=consumer,args=(q,'张三'))
    p2 = Process(target=consumer,args=(q,'小五'))
    p1.start()
    p2.start()
    c1 = Process(target=producer,args=(q,'小黑','包子'))
    c2 = Process(target=producer,args=(q,'小红','馒头'))
    c1.start()
    c2.start()
    c1.join()
    c2.join()
    q.put(None)
    q.put(None)
View Code

十一、joinablequeue

import time
import random
from multiprocessing import Process,JoinableQueue

def consumer(q,name):
    #处理数据
    while 1:
        food = q.get()
        time.sleep(random.uniform(0.5,1))
        print('%s吃了一个%s' %(name,food))
        q.task_done() #通知队列已经有一个数据被处理了

def producer(p,name,food):
    #生产数据
    for i in range(2):
        time.sleep(random.uniform(0.3,0.8))
        print('%s生产了%s%s' %(name,food,i))
        p.put(food+str(i))

if __name__ == '__main__':
    q = JoinableQueue()
    c1 = Process(target=consumer,args=(q,'张三'))
    c2 = Process(target=consumer,args=(q,'王五'))
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()
    p1 = Process(target=producer,args=(q,'小明','包子'))
    p2 = Process(target=producer,args=(q,'小红','馒头'))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    q.join() #阻塞直到放入队列中所有的数据都被处理掉(有多少个数据就接收到了多少taskdone)
View Code

十二、管道

'''
队列是基于管道实现的
管道是基于socket实现的
队列 + 锁 简便的IPC机制 使得进程之间的数据是安全的
管道 进程之间数据不安全,且存取数据复杂
socket + pickle
'''

# from multiprocessing import Pipe
# left,right = Pipe()
# left.send(123)
# print(right.recv())

import time
from multiprocessing import Process,Pipe

# def consumer(left,right):
#     time.sleep(1)
#     print(right.recv())
#
# if __name__ == '__main__':
#     left,right = Pipe()
#     Process(target=consumer,args=(left,right)).start()
#     left.send(456)

def consumer(left,right):
    left.close()
    while 1:
        try:
            print(right.recv())
        except EOFError:
            break


if __name__ == '__main__':
    left,right = Pipe()
    Process(target=consumer,args=(left,right)).start()
    right.close()
    for i in range(5):
        left.send('包子%s' %i)
    left.close()

# pipe的端口管理不会随着某一个进程的关闭就关闭
# 操作系统来管理进程对这些端口的使用
# left,right
# left,right
# 操作系统管理4个端口  每关闭一个端口计数-1,直到所有的端口都关闭了,
# 剩余1个端口的时候 recv就会报错
View Code

十三、进程池

'''
为什么要有进程池?
开启过多的进程并不能提高你的效率
反而会降低效率

计算密集型:充分占用cpu,多进程可以充分利用多核
    适合开启多进程,但是不适合开启很多多进程
IO密集型:大部分时间都在阻塞队列,而不是在运行状态中
    根本不太适合开启多进程
'''
import time
import os
from multiprocessing import Process,Pool

def func(num):
    print('做了%s件衣服' %num)

if __name__ == '__main__':
    start = time.time()
    p = Pool()
    for i in range(100):
        p.apply_async(func,args=(i,)) #异步提交func到一个子进程中执行
    p.close() #关闭进程池,用户不能再向这个池中提交任务了
    p.join()  #阻塞,直到进程池中所有的任务都被执行完
    print(time.time()-start)

    start = time.time()
    p_lst = []
    for i in range(100):
        p = Process(target=func,args=(i,))
        p.start()
        p_lst.append(p)
    for p in p_lst:p.join()
    print(time.time()-start)

def task(num):
    time.sleep(1)
    print('%s : %s' %(num,os.getpid()))
    return num**2

if __name__ == '__main__':
    p = Pool()
    for i in range(10):
        res = p.apply(task,args=(i,)) #同步提交任务
        print('-->',res)

def task(num):
    time.sleep(1)
    print('%s : %s' %(num,os.getpid()))
    return num**2

if __name__ == '__main__':
    p = Pool()
    p_lst = []
    for i in range(10):
        res = p.apply_async(task,args=(i,))
        p_lst.append(res)
    for res in p_lst:
        print(res.get())
    p.close()
    p.join()

def fun(num):
    time.sleep(1)
    print('%s : %s' %(num,os.getpid()))
    return num**2

if __name__ == '__main__':
    p = Pool()
    p.map(fun,range(10))

# 实例化 传参数 进程的个数 cpu/cpu+1
# 提交任务
    # 同步提交 apply
        # 返回值 : 子进程对应函数的返回值
        # 一个一个顺序执行的,并没有任何并发效果
    # 异步提交 apply_async
        # 没有返回值,要想所有任务能够顺利的执行完毕
            # p.close()
            # p.join() # 必须先close再join,阻塞直到进程池中的所有任务都执行完毕
        # 有返回值的情况下
            # res.get() # get不能再提交任务之后立刻执行,应该是先提交所有的任务再通过get获取结果
        # map()方法
            # 异步提交的简化版本
            # 自带close和join方法
View Code

十四、数据共享

# 数据共享的机制
    # 支持数据类型非常有限
    # list dict都不是数据安全的,你需要自己加锁来保证数据安全

from multiprocessing import Manager,Process,Lock

def work(d,lock):
    with lock:
        d['count'] -= 1

if __name__ == '__main__':
    lock = Lock()
    with Manager() as m:
        dic = m.dict({'count':100})
        p_lst = []
        for i in range(100):
            p = Process(target=work,args=(dic,lock))
            p.start()
            p_lst.append(p)
        for p in p_lst:p.join()
        print(dic)
View Code

十五、threading模块

import os
import time
from threading import Thread
from multiprocessing import Process

# 并发
def fun(i):
    print('子:',i,os.getpid())

print('主:',os.getpid())
for i in range(10):
    t = Thread(target=fun,args=(i,))
    t.start()

# 轻量级
def fun(i):
    print('子:',i,os.getpid())

if __name__ == '__main__':
    start = time.time()
    t_lst = []
    for i in range(100):
        t = Thread(target=fun,args=(i,))
        t.start()
        t_lst.append(t)
    for t in t_lst:t.join()
    tt = time.time()-start

    start = time.time()
    p_lst = []
    for i in range(100):
        p = Process(target=fun,args=(i,))
        p.start()
        p_lst.append(p)
    for p in p_lst:p.join()
    pt = time.time()-start

    print('线程:%s,进程:%s' %(tt,pt))

# 数据共享
num = 100
def fun():
    global num
    num -= 1

if __name__ == '__main__':
    t_lst = []
    for i in range(100):
        t = Thread(target=fun)
        t.start()
        t_lst.append(t)
    for t in t_lst:t.join()
    print(num)

'''
Thread实例对象的方法:
    isAlive(): 返回线程是否活动的。
    getName(): 返回线程名。
    setName(): 设置线程名。

threading模块提供的一些方法:
    threading.currentThread(): 返回当前的线程变量。
    threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
'''
from threading import currentThread
from threading import enumerate
from threading import active_count

def fun():
    print('子:', currentThread().ident)
    time.sleep(3)

print('主:',currentThread().ident)
if __name__ == '__main__':
    # t = Thread(target=fun)
    # t.start()
    # print(t.is_alive())
    # print(t.getName())
    # t.setName('t1')
    # print(t.getName())

    for i in range(10):
        t = Thread(target=fun)
        t.start()
    # print(len(enumerate()))
    print(active_count())
View Code

十六、回调函数

# 回调函数是由主进程实现的
# 子进程有大量的计算要去做,回调函数等待结果做简单处理

url_lst = [
    'http://www.baidu.com',
    'http://www.taobao.com',
    'http://www.jd.com',
    'http://www.mi.com'
]

import re
from urllib.request import urlopen
from multiprocessing import Pool

def get_url(url):
    response = urlopen(url)
    ret = re.search('www.(.*?).com',url)
    print('%s finished'%ret.group(1))
    return ret.group(1),response.read()

def call(content):
    url,con = content
    with open(url+'.html','wb') as f:
        f.write(con)

if __name__ == '__main__':
    p = Pool()
    for url in url_lst:
        p.apply_async(get_url,args=(url,),callback=call)
    p.close()
    p.join()
View Code

十七、线程理论

# 进程是
    # 计算机中最小的资源分配单位
    # 进程对于操作系统来说还是有一定负担
    # 创建一个进程 操作系统要分配的资源大致有 :
        # 代码
        # 数据
        # 文件

# 为什么要有线程
    # 轻量级的概念
    # 他没有属于自己的进程资源:
        # 一条线程只负责执行代码,没有自己独立的
        # 代码、变量、文件资源
# 什么是线程
    # 线程是计算机中被CPU调度的最小单位
    # 你的计算机当中的cpu都是执行的线程中的代码
# 线程和进程之间的关系
    # 每一个进程中都有至少一条线程在工作
# 线程的特点
    # 同一个进程中的所有线程的资源是共享的
    # 轻量级 没有自己的资源
# 进程和线程之间的区别
    # 占用的资源
    # 调度的效率
    # 资源是否共享
# 通用的问题
    # 一个进程中的多个线程能够并行么?
    # java c++ c#

# python中的线程
    # 一个进程中的多个线程能够并行么? 不行
    # python是一个解释型语言
        # 为什么不行?
        # Cpython解释器 内部有一把全局解释器锁 GIL
            # 所以线程不能充分的利用多核
            # 同一时刻用一个进程中的线程只有一个能被CPU执行
        # GIL锁 确实是限制了你的程序效率
        # GIL锁 目前 是能够帮助你在线程的切换中提高效率
View Code

十八、守护线程

import time
from threading import Thread

def fun1():
    while 1:
        time.sleep(0.5)
        print(123)

def fun2():
    print('fun2 start')
    time.sleep(3)
    print('fun3 end')

t1 = Thread(target=fun1)
t2 = Thread(target=fun2)
t1.daemon = True
t1.start()
t2.start()
print('主线程代码结束')

# 守护线程 是在主线程代码结束之后,还等待了子线程执行结束才结束
# 主线程结束 就意味着主进程结束
# 主线程等待所有的线程结束
# 主线程结束了之后 守护线程随着主进程的结束自然结束了
View Code

十九、线程与进程的关系

线程与进程之间的关系
    每个进程内都有一个线程
    线程是不能独立存在的
    线程和进程之间的区别
    同一个进程中线程之间的数据是共享的
    进程之间的数据是隔离的
    线程是被cpu执行的最小单位
        操作系统调度
    进程是计算机中最小的资源分配单位

python
    GIL锁 全局解释器锁 全局锁
    cpython解释器中的
    锁线程 :同一时刻同一个进程只会有一个线程访问CPU
    锁的是线程而不是数据
    当程序是高IO型的 多线程
    当程序是高计算(CPU)型的 多进程
        cpu*1 ~ cpu*2

守护线程 :主线程结束之后才结束

线程为什么要有锁
GIL
    线程之间的数据安全问题 :
    += -= 赋值操作不安全
    pop append 都是线程安全的
    队列也是数据安全的
View Code

二十、互斥锁和数据安全

from threading import Thread,Lock
n = 0
def func(lock):
    global n
    for i in range(1500000):
        lock.acquire()
        n  -= 1
        lock.release()

def func2(lock):
    global n
    for i in range(1500000):
        lock.acquire()
        n += 1
        lock.release()

if __name__ == '__main__':
    t_lst = []
    lock = Lock()
    for i in range(10):
        t2 = Thread(target=func2,args=(lock,))
        t = Thread(target=func,args=(lock,))
        t.start()
        t2.start()
        t_lst.append(t)
        t_lst.append(t2)
    for t in t_lst:
        t.join()
    print('-->',n)
View Code

21、递归锁

1、 死锁现象
    两把锁,异步的
    操作的时候 抢到一把锁之后还要再去抢第二把锁
    一个线程抢到一把锁,另一个线程抢到了另一把锁
2、 递归锁可以解决互斥锁的死锁问题
    互斥锁
        两把锁
        多个线程抢
    递归锁
        一把锁
        多个线程抢
3、 递归锁好不好?
    递归锁并不是一个好的解决方案
    死锁现象的发生不是互斥锁的问题
    而是程序员的逻辑有问题导致的
    递归锁能够快速的解决死锁问题
4、 递归锁
    迅速恢复服务 递归锁替换互斥锁
    在接下来的时间中慢慢把递归锁替换成互斥锁
    能够完善代码的逻辑
    提高代码的效率
多个线程之间,用完一个资源再用另外一个资源
先释放一个资源,再去获取一个资源的锁

import time
from threading import Thread,RLock

noodle_lock = fork_lock = RLock()

def eat1(name):
    noodle_lock.acquire()
    print('%s拿到面条了' %name)
    fork_lock.acquire()
    print('%s拿到叉子了' %name)
    print('%s吃面' %name)
    time.sleep(0.3)
    fork_lock.release()
    print('%s放下叉子' %name)
    noodle_lock.release()
    print('%s放下筷子' %name)

def eat2(name):
    fork_lock.acquire()
    print('%s拿到叉子了' % name)
    noodle_lock.acquire()
    print('%s拿到面条了'%name)
    print('%s吃面'%name)
    time.sleep(0.3)
    noodle_lock.release()
    print('%s放下面'%name)
    fork_lock.release()
    print('%s放下叉子' % name)

if __name__ == '__main__':
    name1_list = ['小明','小花']
    name2_list = ['大王','小王']
    for name in name1_list:
        Thread(target=eat1,args=(name,)).start()
    for name in name2_list:
        Thread(target=eat2,args=(name,)).start()
View Code

22、信号量

import time
from threading import Semaphore,Thread

def func(index,sem):
    sem.acquire()
    print(index)
    time.sleep(1)
    sem.release()

if __name__ == '__main__':
    sem = Semaphore(5)
    for i in range(10):
        Thread(target=func,args=(i,sem)).start()
View Code

23、事件

# 检测数据库连接
# wait() 等待 事件内的信号编程True
# set()  把信号变成True
# clear  把信号变成False
# is_set 查看信号状态是否为True
# from threading import Event
#
# e = Event()
# print(e.is_set())
# e.wait(timeout = 5)
# print(e.is_set())
import time
import random
from threading import Event,Thread
def check(e):
    print('开始检测数据库连接')
    time.sleep(random.randint(1,5))  # 检测数据库连接
    e.set()  # 成功了

def connect(e):
    for i in range(3):
        e.wait(0.5)
        if e.is_set():
            print('数据库连接成功')
            break
        else:
            print('尝试连接数据库%s次失败'%(i+1))
    else:
        raise TimeoutError

e = Event()
Thread(target=connect,args=(e,)).start()
Thread(target=check,args=(e,)).start()
View Code

24、条件

# notify 控制流量 通知有多少人可以通过了
# wait 在门口等待的所有人

# acquire
# wait 使用前后都需要加锁
# 做的事情
# release

# acquire
# notify 使用前后都需要加锁
# release

from threading import Condition,Thread
def func(con,index):
    print('%s在等待'%index)
    con.acquire()
    con.wait()
    print('%s do something'%index)
    con.release()

con = Condition()
for i in range(10):
    t = Thread(target=func,args=(con,i))
    t.start()
con.acquire()
con.notify_all()
con.release()
# count = 10
# while count > 0:
#     num= int(input('>>>'))
#     con.acquire()
#     con.notify(num)
#     count -= num
#     con.release()
View Code

25、定时器

from threading import Timer
def func():
    print('执行我啦')

t = Timer(3600,func)
t.start()
print('主线程')
View Code

26、队列

# from queue import Queue

# qps 每秒钟接受的请求数
# 帮助你维持程序相应的顺序

# 栈完成算法 后进先出
from queue import LifoQueue

lq = LifoQueue()
lq.put(1)
lq.put(2)
lq.put(3)
lq.put('a')
lq.put('b')
print(lq.get())
print(lq.get())
print(lq.get())

# 优先级队列
from queue import PriorityQueue

pq = PriorityQueue()
pq.put((15,'abc'))
pq.put((5,'ghi'))
pq.put((12,'def'))
pq.put((12,'aaa'))

print(pq.get())
print(pq.get())
print(pq.get())

from queue import PriorityQueue
pq = PriorityQueue()
pq.put(15)
pq.put(5)
pq.put(12)
pq.put(12)

print(pq.get())
print(pq.get())
print(pq.get())

from queue import PriorityQueue
pq = PriorityQueue()
pq.put('abc')
pq.put('def')

print(pq.get())
print(pq.get())
View Code

27、线程池进程池

import time
import os
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread

# def fun(i):
#     print('thread',i)
#     time.sleep(1)
#     print('thread %s end'%i)

# if __name__ == '__main__':
#     tp = ProcessPoolExecutor(5)
#     tp.submit(fun,1)
#     tp.shutdown()
#     print('主线程')

# def fun(i):
#     print('thread',i,current_thread().ident)
#     time.sleep(1)
#     print('thread %s end' %i)

# tp = ThreadPoolExecutor(5)
# for i in range(20):
#     tp.submit(fun,i)
# tp.shutdown()
# print('主线程')

# 获取返回值
# def fun(i):
#     print('thread',i,current_thread().ident)
#     time.sleep(1)
#     print('thread %s end' %i)
#     return i* '*'

# tp = ThreadPoolExecutor(5)
# res_l = []
# for i in range(20):
#     res = tp.submit(fun,i)
#     res_l.append(res)
# for res in res_l:
#     print(res.result())
# print('主线程')

# map
# def fun(i):
#     print('thread',i)
#     time.sleep(1)
#     print('thread %s end' %i)
#     return i* '*'

# tp = ThreadPoolExecutor(5)
# res = tp.map(fun,range(20))
# for i in res:print(i)
        
# 回调函数
# 线程池的回调函数是由子线程完成的
# def func(i):
#     print('thread',i,current_thread().ident)
#     time.sleep(1)
#     print('thread %s end'%i)
#     return i* '*'

# def call_back(arg):
#     print('call back : ',current_thread().ident)
#     print('ret : ',arg.result())

# tp = ThreadPoolExecutor(5)
# for i in range(20):
#     tp.submit(func,i).add_done_callback(call_back)
# tp.shutdown()
# print('主线程',current_thread().ident)

# 进程池
# def fun(i):
#     print('process',i,os.getpid())
#     time.sleep(1)
#     print('process %s end' %s)

# if __name__ == '__main__':
#     tp = ProcessPoolExecutor(5)
#     tp.submit(fun,1)
#     tp.shutdown()
#     print('主进程',os.getpid())

# 进程池回调函数
# 回调函数是主进程来完成的
# def fun(i):
#     print('process',i,os.getpid())
#     time.sleep(1)
#     print('process %s end' %i)
#     return i* '*'

# def call_back(arg):
#     print('call_back : ',os.getpid())
#     print('res : ',arg.result())

# if __name__ == '__main__':
#     tp = ProcessPoolExecutor(5)
#     for i in range(20):
#         tp.submit(fun,i).add_done_callback(call_back)
#     print('主进程',os.getpid())

'''
回调函数
    进程池:是由主进程完成的
    线程池:是由子线程完成的
'''

'''
线程池/进程池
    实例化线程池 ThreadPoolExecutor 5*cpu_count
    异步提交任务 submit/map(没有回调函数)
    阻塞直到任务完成 shutdown
    获取子线程的返回值 result
    回调函数 add_done_callback
'''
View Code

28、协程

# def my_generator():
#     for i in range(10):
#         yield i #保存当前程序的状态

# for num in my_generator():
#     print(num)

# def consumer():
#     g = producer()
#     for num in g:
#         print(num)

# def producer():
#     for i in range(1000):
#         yield i
    
# consumer()
# 协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率
'''
更好的利用协程
一个线程的执行明确的切开
两个人物,帮助你记住哪个任务执行到哪个位置上了,并且实现安全的切换
一个任务不得不陷入阻塞了,在这个任务阻塞的过程中切换到另一个任务继续执行
你的程序只要还有任务需要执行 你的当前线程永远不会阻塞
利用协程在多个任务陷入阻塞的时候进行切换来保证一个线程在处理多个任务的时候总是忙碌的
能够更加充分的利用CPU,抢占更多的时间片
无论是进程、还是线程都是由操作系统来切换的,开启过多的线程、进程会给操作系统的调度增加负担
如果我们是使用协程,协程在程序之间的切换操作系统感知不到,无论开启多少个协程对操作系统来说总是一个线程
操作系统的调度不会有任何压力
'''
# 协程模块
# greenlet gevent的底层,协程,切换的模块
# gevent 直接用的,gevent
# import time
# from greenlet import greenlet

# def eat():
#     print('eating 1')
#     g2.switch()
#     time.sleep(1)
#     print('eating 2')

# def play():
#     print('playing 1')
#     g1.switch()
#     time.sleep(1)
#     print('playing 2')

# g1 = greenlet(eat)
# g2 = greenlet(play)
# g1.switch()

# import gevent

# def eat():
#     print('eating 1')
#     time.sleep(1)
#     print('eating 2')

# def play():
#     print('playing 1')
#     time.sleep(1)
#     print('playing 2')

# g1 = gevent.spawn(eat) #自动检测阻塞事件,遇见阻塞了就会进行切换,有些阻塞不认识
# g2 = gevent.spawn(play)
# g1.join()
# g2.join()

from gevent import monkey;monkey.patch_all()
import time 
import gevent

def eat():
    print('eating 1')
    time.sleep(1)
    print('eating 2')

def play():
    print('playing 1')
    time.sleep(1)
    print('playing 2')

g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
g1.join()
g2.join()
View Code

29、local

# 多个线程之间,使用threading.local对象,可以实现多个线程之间的数据隔离
import time
import random
from threading import local,Thread

loc = local()
def fun2():
    global loc
    print(loc.name,loc.age)

def fun(name,age):
    global loc
    loc.name = name
    loc.age = age
    time.sleep(random.random())
    fun2()

Thread(target=fun,args=('zhangsan',40)).start()
Thread(target=fun,args=('lisi',28)).start()
View Code

30、IO模型

# 同步
    # 一件事情做完再做另一件事情
# 异步
    # 同时做多件事情
    # 相对论 多线程 多进程 协程 异步的程序
        # 宏观角度 :异步 并发聊天
# 阻塞
    # sleep input join shutdown get acquire wait
    # accept recv recvfrom
# 非阻塞
    # setblocking(False)

# 网络IO模型
# socket
# 用socket 一定会用到accept recv recvfrom这些方法
# 正常情况下 accept recv recvfrom都是阻塞的
# 如果setblocking(False) 整个程序就变成一个非阻塞的程序了

# 阻塞IO
# recv做了哪些事情
# 阻塞IO的recv,
    # wait for data阶段     阻塞
    # copy data   阶段      阻塞

# 非阻塞IO
# TODO:123
import time
import socket
sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.setblocking(False)   # 设置当前的socket server为一个非阻塞IO模型
sk.listen()
conn_l = []
del_l = []
while True:
    try:
        conn,addr = sk.accept()
        conn_l.append(conn)   # [conn1,conn2]
    except BlockingIOError:
        for conn in conn_l:   # [conn1,conn2]
            try:
                conn.send(b'hello')
                print(conn.recv(1024))
            except (NameError,BlockingIOError):pass
            except ConnectionResetError:
                conn.close()
                del_l.append(conn)
        for del_conn in del_l:
            conn_l.remove(del_conn)
        del_l.clear()

# 没有并发编程的机制
# 同步的程序
# 非阻塞的特点
# 程序不会再某一个连接的recv或者sk的accept上进行阻塞
# 我就有更多的事件来做信息的收发工作

# 太多while True 高速运行着
# 大量的占用了CPU导致了资源的浪费

# 阻塞IO的问题:
#     一旦阻塞就不能做其他事情了
# 非阻塞IO的问题:
#     给CPU造成了很大的负担

import socket

sk = socket.socket()
sk.connect(('127.0.0.1',9000))
for i in range(1000000):
    sk.send(b'world')
    print(sk.recv(1024))   # 收到对面的消息

sk.close()
View Code

31、IO多路复用

import select  #  模块
import socket
# 用来操作操作系统中的select(IO多路复用)机制
sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.setblocking(False)
sk.listen()

r_lst = [sk,]
print(sk)
while True:
    r_l,_,_ = select.select(r_lst,[],[])  # r_lst = [sk,conn1,conn2,conn3]
    for item in r_l:
        if item is sk:
            conn, addr = sk.accept()
            r_lst.append(conn)
        else:
            try:
                print(item.recv(1024))
                item.send(b'hello')
            except ConnectionResetError:
                item.close()
                r_lst.remove(item)

import socket

sk = socket.socket()
sk.connect(('127.0.0.1',9000))
for i in range(1000000):
    sk.send(b'world')
    print(sk.recv(1024))   # 收到对面的消息

sk.close()

# io多路复用机制
# select windows、maclinux
    # 底层是操作系统的轮询
    # 有监听对象个数的限制
    # 随着监听对象的个数增加,效率降低
# poll maclinux
    # 底层是操作系统的轮询
    # 有监听对象个数的限制,但是比select能监听的个数多
    # 随着监听对象的个数增加,效率降低
# epoll maclinux
    # 给每一个要监听的对象都绑定了一个回调函数
    # 不再受到个数增加 效率降低的影响

# socketserver
    # IO多路复用 + threading线程
# selectors模块
    # 帮助你在不同的操作系统上进行IO多路复用机制的自动筛选


# IO多路复用
# asycio tornado twisted
View Code


原文地址:https://www.cnblogs.com/echo-up/p/9669310.html