Day36 python基础--并发编程基础5

一,线程的锁

  Q1:线程为什么要有锁

    1.线程之间的数据安全问题:

      +=,-=,*=,/=赋值操作不安全,如果涉及这些这些一定要加锁

    2.pop.append是线程安全

    3.队列也是数据安全的

    4.多线程中,别在线程中操作全局变量

    5.可以使用dic模块中的方法来查看某个操作对应的cpu指令

  

  互斥锁与递归锁

#互斥锁Lock
from threading import Thread,Lock
n = 1500000
def func(lock):
    global n
    for i in range(1500000):
        with lock:   #如果不加锁,则会出现数据不安全的问题
            n -= 1

def func2(lcok):
    global n
    for i in range(1500000):
        lock.acquire()   #如果不加锁,则会出现数据不安全的问题
        n += 1
        lock.release()

if __name__ == '__main__':
    t_lst = []
    lock = Lock()
    for i in range(10):
        t = Thread(target=func,args=(lock,))
        t2 = Thread(target=func2,args=(lock,))
        t.start()
        t2.start()
        t_lst.append(t)
        t_lst.append(t2)
    for t in t_lst:
        t.join()
    print('-->',n)
#互斥锁的死锁现象:
    #一共有两把锁
    #线程之间是异步的
        #操作的时候,抢到一把锁之后还要再去抢第二把锁
        #一个线程抢到一把锁
        #另一个线程抢到了另一把锁
import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
    noodle_lock.acquire()
    print('%s拿到面条了'%name)
    fork_lock.acquire()
    print('%s拿到叉子了'%name)
    print('%s吃面'%name)
    time.sleep(0.3)
    fork_lock.release()
    print('%s放下叉子'%name)
    noodle_lock.release()
    print('%s放下面'%name)

def eat2(name):
    fork_lock.acquire()
    print('%s拿到叉子了' % name)
    noodle_lock.acquire()
    print('%s拿到面条了' % name)
    print('%s吃面'%name)
    time.sleep(0.3)
    noodle_lock.release()
    print('%s放下面'%name)
    fork_lock.release()
    print('%s放下叉子' % name)

if __name__ == '__main__':
    name_list = ['alex','wusir']
    name_list2 = ['nezha','yuan']
    for name in name_list:
        Thread(target=eat1,args=(name,)).start()
    for name in name_list2:
        Thread(target=eat2,args=(name,)).start()

  递归锁:

    1.递归锁可以解决互斥锁的死锁问题

      当多个线程抢占资源时,使用一把递归锁,解决死锁问题

    2.递归锁的优缺点:

      递归锁并不是一个好的解决方法

      死锁现象的发生不是互斥锁的问题,而是程序员的逻辑问题导致的

      递归锁能够快速的解决死锁问题

    3.递归锁的特点:

      当程序发生死锁的时候,能够迅速恢复服务,使用递归锁替换互斥锁

      在后续修复中,逐步把递归锁替换成互斥锁:完善代码逻辑,提高代码的效率

    4.递归锁的逻辑:

      多个线程之间,一个线程用完一个资源再到另外一个线程取使用

      先释放一个资源,再去获取一个资源的锁    

#递归锁RLock
from threading import Thread,RLock
rlock = RLock()
def func(name):
    rlock.acquire()
    print(name,1)
    rlock.acquire()
    print(name,2)
    rlock.acquire()
    print(name,3)
    rlock.acquire()
    print(name,4)
    rlock.release()
    rlock.release()
    rlock.release()
    rlock.release()
if __name__ == '__main__':
    for i in range(10):
        Thread(target=func,args=('name%s'%i,)).start()
#科学家吃面问题,把两把互斥锁改为一把递归锁修复死锁问题
import time
from threading import Thread,RLock
# noodle_fork_lock = Lock()
noodle_lock = fork_lock = RLock()
def eat1(name):
    # noodle_fork_lock.acquire()
    noodle_lock.acquire()
    print('%s拿到面条了'%name)
    fork_lock.acquire()
    print('%s拿到叉子了'%name)
    print('%s吃面'%name)
    time.sleep(0.3)
    fork_lock.release()
    print('%s放下叉子'%name)
    noodle_lock.release()
    # noodle_fork_lock.release()
    print('%s放下面'%name)
    
def eat2(name):
    # noodle_fork_lock.acquire()
    fork_lock.acquire()
    print('%s拿到叉子了'%name)
    noodle_lock.acquire()
    print('%s拿到面条了' % name)
    print('%s吃面'%name)
    time.sleep(0.3)
    noodle_lock.release()
    print('%s放下面' % name)
    fork_lock.release()
    # noodle_fork_lock.release()
    print('%s放下叉子' % name)

if __name__ == '__main__':
    name_list = ['alex','wusir']
    name_list2 = ['nezha','yuan']
    for name in name_list:
        Thread(target=eat1,args=(name,)).start()
    for name in name_list2:
        Thread(target=eat2,args=(name,)).start()

二,线程的信号量

  原理为:锁+计数器

  1.并没有减少线程需要的并发数量

  2.造成多个线程在等待资源的释放

#信号量Semaphore
import time
from threading import Semaphore,Thread
def func(index,sem):
    sem.acquire()
    print(index)
    time.sleep(1)
    sem.release()
if __name__ == '__main__':
    sem = Semaphore(5)
    for i in range(10):
        Thread(target=func,args=(i,sem)).start()

三,线程的事件

  线程与线程之间同步执行,一个线程等待wait()的信号,并由信号决定,线程释放执行,另一个线程负责决定信号的状态,主要用于防止线程长时间或一直处于阻塞状态

#事件:Event

#
实例:检测数据库连接 #先来测试一下数据库是否能被连接 #网通不通 #用户名,密码是否正确 #wait()等待 事件内的信号变成True #set() 把信号变成True #clear() 把信号变成False #is_set() 查看信号状态是否为True import time import random from threading import Event,Thread def check(e): print('开始检测数据库连接') time.sleep(random.randint(1,5)) #检测数据库连接的操作类比 e.set() #成功了 def connect(e): for i in range(3): e.wait(timeout=0.5) if e.is_set(): print('数据库连接成功') break else: print('尝试连接数据库%s次失败'%(i+1)) else: raise TimeoutError #若超过连接次数,则主动抛出异常

if __name__ == '__main__': e = Event() Thread(target=check,args=(e,)).start() Thread(target=connect,args=(e,)).start()

四,线程的条件

  1.notify:控制流量,通知有多少人可以通过了,即设置多少个线程可执行,在使用前后都需要加锁

  2.wait:在门口等待的所有人,即处于阻塞状态的线程,使用前后都需要加锁

#条件:Condition
from threading import Condition,Thread
def func(con,index):
    print('%s在等待'%index)
    con.acquire()
    print('%s在wait' % index)
    con.wait()
    print('% sdo something'%index)
    con.release()

if __name__ == '__main__':
    con = Condition()
    for i in range(10):
        t = Thread(target=func,args=(con,i))
        t.start()
    # con.acquire()
    # con.notify_all() #可以设置流量为不限制
    # con.release()
    count = 10
    while count > 0:
        num = int(input('>>>'))
        con.acquire()
        con.notify(num)
        count -= num
        con.release()

五,线程的定时器

  1.子线程延迟执行不影响主程序的运行

  2.可以实现 linux操作系统上的定时任务的工具 crontab

#定时器:Timer
from threading import Timer
def func():
    print('执行我啦')
t = Timer(5,func)  #延迟5秒执行
t.start()
print('主线程')

六,线程的queue

  1.队列:queue

    线程安全,一般用于排队相关逻辑

    保证qps每秒钟接收请求数

    帮助维护程序的响应的顺序

  应用场景,服务器响应大量并发连接,将来不及响应的连接,存入队列中,并依次响应

#队列:queue
import queue
q = queue.Queue()
#队列的方法
q.put()
q.get()
q.put_nowait()
q.get_nowait()
q.full() #不准确
q.empty() #不准确

  2.栈:LifoQueue

    维护先进后出的顺序

    应用场景:完成算法

#栈:LifoQueue
from queue import LifoQueue
lq = LifoQueue()
#栈的方法
lq.put()
lq.get()
#实例:线程中的生产者消费者模型
from threading import Thread
import queue
import time

def consumer(q,name):
    while True:
        ret = q.get()
        if ret is None:break
        print(name,'吃了',ret)

def producer(q,name,food):
    for i in range(10):
        time.sleep(0.1)
        q.put(food+str(i))
        print(name,'生产了',food+str(i))

if __name__ == '__main__':
    q1 = queue.Queue()   #用于线程间的IPC
    c = Thread(target=consumer,args=(q1,'wusir'))
    p = Thread(target=producer,args=(q1,'alex','饭菜'))
    c.start()
    p.start()
    p.join()
    q1.put(None)

  3.优先级队列:PriorityQueue

    按照优先级条件决定谁先出队列

    队列中元素,数据类型需要一致,并且能狗满足比较大小的条件,否则报错

#优先级队列:PriorityQueue
from
queue import PriorityQueue pq = PriorityQueue() pq.put((15,'abc')) pq.put((12,'def')) #优先级按第二个元素的第一个字母的ascii码大小 pq.put((12,'aaa')) #当第一个元素优先级一样是 pq.put((5,'ghi')) # pq.put(5) # pq.put(12) # pq.put(15) print(pq.get()) print(pq.get()) print(pq.get())

七、总结

# 线程安全的问题
# 数据类型是否线程安全
# += -=不安全
# set list dict 基础方法线程安全
#
# 某块数据安全
#     logging模块 是线程安全的
#
# 互斥锁和递归锁之间的区别
#     出现死锁问题的本质
#         两把锁,锁两个资源
#         分别被不同线程抢占,导致死锁
#     解决方法;
#         速度上
#         逻辑上
# 线程中用到的模块
#     锁,互斥锁,递归锁
#     信号量
#     事件
#     队列

八,线程中local的概念

  多个线程之间使用threading.local对象,可以实现多个线程之间的数据隔离

import time
import random
from threading import local,Thread

loc = local()
def func2():
    global loc
    print(loc.name,loc.age)


def func(name,age):
    global loc
    loc.name = name
    loc.age = age
    time.sleep(random.random())
    func2()

Thread(target=func,args=('alex',40)).start()
Thread(target=func,args=('boss_jin',38)).start()
原文地址:https://www.cnblogs.com/lianyeah/p/9700171.html