day42 Pyhton 并发编程05

一.内容回顾

# 线程
# 正常的编程界:
    # 进程
        # 计算机中最小的资源分配单位
        # 数据隔离
        # 进程可以独立存在
        # 创建与销毁 还有切换 都慢 给操作系统压力大
   # 线程
        # 计算机中能被CPU调度的最小单位
        # 同一个进程中的多个线程资源共享
        # 线程必须依赖进程存在
        # 创建与销毁 还有切换 都比进程快很多
# Cpython解释器下
    # GIL 全局解释器锁
    # 保证了同一时刻下只有一个线程可以被CPU操作

# threading模块
# 创建子线程 Thread类
    # start 开启子线程
    # join  阻塞等待子线程结束
    # setDeamon 设置守护线程
        # 会等待主线程结束(包含所有非守护的子线程)之后守护线程才结束
# currentthread,enumerate,activecount
# 查看当前线程,所有的线程对象组成的列表,列表的长度
# from threading import Thread,currentThread
# def func():
#     print(currentThread())
# for i in range(10):
#     Thread(target=func).start()

今日内容

# 1.锁   ****
    # 互斥锁
    # 死锁现象
    # 递归锁
# 2.其他模型 **
    # 信号量
    # 事件
    # 条件
    # 定时器
# 3.线程队列 ****
# 4.线程池模块 ****
# 锁是用来做什么的?
# 保证数据的安全的
# GIL是干什么的?
# 全局解释器锁线程
# 有了GIL还要锁干啥?
# 有了GIL还是会出现数据不安全的现象,所以还是要用锁
# import time
# from threading import Thread,Lock
# n = 100
# def func(lock):
#     global n
#     # n -= 1
#     with lock:
#         tmp = n-1  # n-=1
#         time.sleep(0.1)
#         n = tmp
#
# if __name__ == '__main__':
#     l = []
#     lock = Lock()
#     for i in range(100):
#         t = Thread(target=func,args=(lock,))
#         t.start()
#         l.append(t)
#     for t in l:t.join()
#     print(n)
dis模块使用
import dis
n = 1
def func():
    n = 100
    n -= 1

dis.dis(func)

# 会出现线程不安全的两个条件
# 1.是全局变量
# 2.出现 += -=这样的操作


# 列表 字典
# 方法 l.append l.pop l.insert dic.update 都是线程安全的
# l[0] += 1
# d[k] += 1
死锁现象
# 科学家吃面问题
import time
from threading import Thread,Lock
# noodle_lock = Lock()
# fork_lock = Lock()
# 死锁不是时刻发生的,有偶然的情况整个程序都崩了
# 每一个线程之中不止一把锁,并且套着使用
# 如果某一件事情需要两个资源同时出现,那么不应该将这两个资源通过两把锁控制
# 而应看做一个资源
import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock(
def eat1(name):
    noodle_lock.acquire()
    print('%s拿到面条了'%name)
    fork_lock.acquire()
    print('%s拿到叉子了'%name)
    print('%s开始吃面'%name)
    time.sleep(0.2)
    fork_lock.release()
    print('%s放下叉子了' % name)
    noodle_lock.release()
    print('%s放下面了' % name)

def eat2(name):
    fork_lock.acquire()
    print('%s拿到叉子了' % name)
    noodle_lock.acquire()
    print('%s拿到面条了' % name)
    print('%s开始吃面' % name)
    time.sleep(0.2)
    noodle_lock.release()
    print('%s放下面了' % name)
    fork_lock.release()
    print('%s放下叉子了' % name)

Thread(target=eat1,args=('alex',)).start()
Thread(target=eat2,args=('wusir',)).start()
Thread(target=eat1,args=('太白',)).start()
Thread(target=eat2,args=('宝元',)).start()
alex拿到面条了
alex拿到叉子了
alex开始吃面
alex放下叉子了wusir拿到叉子了
alex放下面了太白拿到面条了
发生了死锁
import time
from threading import Thread,Lock
lock = Lock()
def eat1(name):
    lock.acquire()#只有一个锁,把资源都锁在一起
    print('%s拿到面条了'%name)
    print('%s拿到叉子了'%name)
    print('%s开始吃面'%name)
    time.sleep(0.2)
    lock.release()
    print('%s放下叉子了' % name)
    print('%s放下面了' % name)

def eat2(name):
    lock.acquire()
    print('%s拿到叉子了' % name)
    print('%s拿到面条了' % name)
    print('%s开始吃面' % name)
    time.sleep(0.2)
    lock.release()
    print('%s放下面了' % name)
    print('%s放下叉子了' % name)

Thread(target=eat1,args=('alex',)).start()
Thread(target=eat2,args=('wusir',)).start()
Thread(target=eat1,args=('太白',)).start()
Thread(target=eat2,args=('宝元',)).start()
# 先临时解决
# 然后再找到死锁的原因,再去修改
from threading import RLock,Lock,Thread
# 互斥锁
    # 无论在相同的线程还是不同的线程,都只能连续acquire一次
    # 要想再acquire,必须先release
# 递归锁
    # 在同一个线程中,可以无限次的acquire
    # 但是要想在其他线程中也acquire,
    # 必须现在自己的线程中添加和acquire次数相同的release
rlock = RLock()
rlock.acquire()
rlock.acquire()
rlock.acquire()
rlock.acquire()
print('锁不住')
锁不住
lock = Lock()
lock.acquire()
print('1')
lock.acquire()
print('2')#不能打印2
rlock = RLock()
def func(num):
    rlock.acquire()
    print('aaaa',num)
    rlock.acquire()
    print('bbbb',num)
    rlock.release()
    rlock.release()

Thread(target=func,args=(1,)).start()
Thread(target=func,args=(2,)).start()
aaaa 1
bbbb 1
aaaa 2
bbbb 2
import time
noodle_lock = fork_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s拿到面条了'%name)
    fork_lock.acquire()
    print('%s拿到叉子了'%name)
    print('%s开始吃面'%name)
    time.sleep(0.2)
    fork_lock.release()
    print('%s放下叉子了' % name)
    noodle_lock.release()
    print('%s放下面了' % name)

def eat2(name):
    fork_lock.acquire()
    print('%s拿到叉子了' % name)
    noodle_lock.acquire()
    print('%s拿到面条了' % name)
    print('%s开始吃面' % name)
    time.sleep(0.2)
    noodle_lock.release()
    print('%s放下面了' % name)
    fork_lock.release()
    print('%s放下叉子了' % name)

Thread(target=eat1,args=('alex',)).start()
Thread(target=eat2,args=('wusir',)).start()
Thread(target=eat1,args=('太白',)).start()
Thread(target=eat2,args=('宝元',)).start()

alex拿到面条了

alex拿到叉子了

alex开始吃面

alex放下叉子了

alex放下面了

wusir拿到叉子了

wusir拿到面条了

wusir开始吃面

wusir放下面了

wusir放下叉子了

太白拿到面条了

太白拿到叉子了

太白开始吃面

太白放下叉子了

太白放下面了

宝元拿到叉子了

宝元拿到面条了

宝元开始吃面

宝元放下面了

宝元放下叉子了

信号量

import time
from threading import Semaphore,Thread

def func(name,sem):
    sem.acquire()
    print(name,'start')
    time.sleep(1)
    print(name,'stop')
    sem.release()

sem = Semaphore(5)
for i in range(20):
    Thread(target=func,args=(i,sem)).start()
# 信号量和池
# 进程池
    # 有1000个任务
    # 一个进程池中有5个进程
    # 所有的1000个任务会多次利用这五个进程来完成任务
# 信号量
    # 有1000个任务
    # 有1000个进程/线程
    # 所有的1000个任务由于信号量的控制,只能5个5个的执行

事件

from threading import Event
# 事件
# wait() 阻塞 到事件内部标识为True就停止阻塞
# 控制标识
    # set
    # clear
    # is_set

# 连接数据库
import time
import random
from threading import Thread,Event
def connect_sql(e):
    count = 0
    while count < 3:
        e.wait(0.5)
        if e.is_set():
            print('连接数据库成功')
            break
        else:
            print('数据库未连接成功')
            count += 1

def test(e):
    time.sleep(random.randint(0,3))
    e.set()

e = Event()
Thread(target=test,args=(e,)).start()
Thread(target=connect_sql,args=(e,)).start()

条件

# wait      阻塞
# notify(n) 给信号
# 假如现在有20个线程
# 所有的线程都在wait这里阻塞
# notify(n) n传了多少
# 那么wait这边就能获得多少个解除阻塞的通知

# notifyall
# acquire
# release

import threading

def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" % n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
        print('****')

# 设置某个条件
# 如果满足这个条件 就可以释放线程
# 监控测试我的网速
# 20000个任务
# 测试我的网速 /系统资源
# 发现系统资源有空闲,我就放行一部分任务

定时器

from threading import Timer

def func():
    print('执行我啦')

t = Timer(3,func)
# 现在这个时间点我不想让它执行,而是预估一下大概多久之后它执行比较合适
t.start()
print('主线程的逻辑')

队列

import queue

# 线程队列 线程之间数据安全
q = queue.Queue(1)
# 普通队列
q.put(1)
print(q.get())
try:
    q.put_nowait(2)
except queue.Full:
    print('您丢失了一个数据2')
print(q.get_nowait()) # 如果有数据我就取,如果没数据不阻塞而是报错
# 非阻塞的情况下
q.put(10)
print(q.get(timeout=2))
1
2
10
# 算法里 栈
lfq = queue.LifoQueue()   #
lfq.put(1)
lfq.put(2)
lfq.put(3)
print(lfq.get())
print(lfq.get())
print(lfq.get())
3
2
1
# 优先级队列,是根据第一个值的大小来排定优先级的
# ascii码越小,优先级越高
q = queue.PriorityQueue()
q.put((2,'a'))
q.put((0,'c'))
q.put((1,'b'))

print(q.get())

# 线程+队列 实现生产者消费者模型

线程池

def func(num):
    print('in %s func'%num,currentThread())
    time.sleep(random.random())
    return num**2

tp = ThreadPoolExecutor(5)
ret_l = []
for i in range(30):
    ret = tp.submit(func,i)#运行时只有5个线程运行
    ret_l.append(ret)
for ret in ret_l:
    print(ret.result())
import time
import random
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor  as Pool

import os
def func(num):
    print('in %s func'%num,currentThread())
    # print('in %s func'%num,os.getpid())
    time.sleep(random.random())
    return num**2
if __name__ == '__main__':
    # tp = ThreadPoolExecutor(5)
    tp = Pool(5)
    ret_l = []
    for i in range(30):
        ret = tp.submit(func,i)
        ret_l.append(ret)
    tp.shutdown()  # close + join
    for ret in ret_l:
        print(ret.result())
# 创建一个池
# 提交任务 submit
# 阻塞直到任务完成(close + join) shutdown
# 获取结果 result
# 简便用法 map
# 回调函数 add_done_callback
# 简便用法 map
import os
def func(num):
    print('in %s func'%num,currentThread())
    # print('in %s func'%num,os.getpid())
    time.sleep(random.random())
    return num**2
if __name__ == '__main__':

    # tp = ThreadPoolExecutor(5)
    tp = Pool(5)
    ret = tp.map(func,range(30))
    for i in ret:
        print(i)
# 回调函数 add_done_callback
def func1(num):
    print('in func1 ',num,currentThread())
    return num*'*'

def func2(ret):
    print('--->',ret.result(),currentThread())
tp = Pool(5)
print('主 : ',currentThread())
for i in range(10):
    tp.submit(func1,i).add_done_callback(func2)
# 回调函数收到的参数是需要使用result()获取的
# 回调函数是由谁执行的? 主线程
# 相关概念
    # 进程
    # 线程
        # GIL
# 很多模型
#     进程 锁(递归锁 互斥锁)池(cpu的1-2倍)队列
#     线程 锁(递归锁 互斥锁)池(cpu个数的5倍)队列  其他模型
原文地址:https://www.cnblogs.com/pythonz/p/10110715.html