递归锁,队列

1.线程为什么要有锁:

  全局解释器锁GIL 不能完全确保数据的安全(时间片轮转法)

  线程之间等的数据安全问题:

  +=,-=赋值操作不安全

  不涉及赋值操作的数据是安全的

不安全:

from threading import Lock,Thread
n = 1500000
def func():
    global n
    for i in range(1500000):
        n -= 1
def func2():
    global n
    for i in range(1500000):
        n += 1
if __name__ == '__main__':
    t_lst = []
    for i in range(10):
        t = Thread(target=func)
        t2 = Thread(target=func2)
        t.start()
        t2.start()
        t_lst.append(t)
        t_lst.append(t2)
    for t in t_lst:
        t.join()
    print('>>>>',n)

  加锁:

from threading import Lock,Thread
n = 150000
def func(lock):
    global n
    for i in range(150000):
        lock.acquire()
        n -= 1
        lock.release()
def func2(lock):
    global n
    for i in range(150000):
        lock.acquire()
        n += 1
        lock.release()
if __name__ == '__main__':
    lock = Lock()
    t_lst = []
    for i in range(10):
        t = Thread(target=func,args=(lock,))
        t2 = Thread(target=func2,args=(lock,))
        t.start()
        t2.start()
        t_lst.append(t)
        t_lst.append(t2)
    for t in t_lst:
        t.join()
    print('>>>>',n)

  

2:互斥锁与递归锁

  死锁现象:

    两把锁

    异步的

    操作的时候,一个线程抢到一把锁之后还要再去抢第二把锁

    一个线程抢到一把锁

    另一个线程抢到了另一把锁

3:递归锁可以解决互斥锁的问题

  互斥锁:

    两把锁

    多个线程抢

  递归锁:

    一把钥匙

    多个线程抢

递归锁好不好?

  递归锁并不是一个好的解决方案

  死锁现象的发生不是互斥锁的问题

  而是程序员的逻辑有问题导致的

  递归锁能够快速的解决死锁问题

递归锁:

  迅速恢复服务 ,递归锁替换互斥锁

  在接下来的时间中慢慢把递归锁替换成互斥锁

    完善代码的逻辑

    提高代码的效率

多个线程之间,用完一个资源再用另一个资源

先释放一个资源,再去获取一个资源的锁

死锁现象:

import time
from threading import Thread,Lock

noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
    noodle_lock.acquire()
    print('%s拿到面条了' % name)
    fork_lock.acquire()
    print('%s拿到叉子了' % name)
    print('%s吃面' % name)
    time.sleep(0.3)
    fork_lock.release()
    print('%s放下叉子' % name)
    noodle_lock.release()
    print('%s放下面' % name)

def eat2(name):
    fork_lock.acquire()
    print('%s拿到叉子了' % name)
    noodle_lock.acquire()
    print('%s拿到面条了' % name)
    print('%s吃面' % name)
    time.sleep(0.3)
    noodle_lock.release()
    print('%s放下面' % name)
    fork_lock.release()
    print('%s放下叉子' % name)

if __name__ == '__main__':
    name_list = ['liming','lining']
    name_list2 = ['lijing', 'liling']
    for name in name_list:
        Thread(target=eat1,args=(name,)).start()
    for name in name_list2:
        Thread(target=eat2,args=(name,)).start()

  递归锁

from threading import Thread,RLock
rlock = RLock()
def func(name):
    rlock.acquire()
    print(name,1)
    rlock.acquire()
    print(name,2)
    rlock.acquire()
    print(name,3)
    rlock.release()
    rlock.release()
    rlock.release()
for i in range(10):
    Thread(target=func,args=('liming%s'%i,)).start()

  解决死锁问题,互斥锁修改位递归锁

import time
from threading import Thread,RLock#Lock

# noodle_lock = Lock()
# fork_lock = Lock()
fork_lock = noodle_lock = RLock() #互斥锁改为递归锁
def eat1(name):
    noodle_lock.acquire()
    print('%s拿到面条了' % name)
    fork_lock.acquire()
    print('%s拿到叉子了' % name)
    print('%s吃面' % name)
    time.sleep(0.3)
    fork_lock.release()
    print('%s放下叉子' % name)
    noodle_lock.release()
    print('%s放下面' % name)

def eat2(name):
    fork_lock.acquire()
    print('%s拿到叉子了' % name)
    noodle_lock.acquire()
    print('%s拿到面条了' % name)
    print('%s吃面' % name)
    time.sleep(0.3)
    noodle_lock.release()
    print('%s放下面' % name)
    fork_lock.release()
    print('%s放下叉子' % name)

if __name__ == '__main__':
    name_list = ['liming','lining']
    name_list2 = ['lijing', 'liling']
    for name in name_list:
        Thread(target=eat1,args=(name,)).start()
    for name in name_list2:
        Thread(target=eat2,args=(name,)).start()

  

4:信号量

import time
from threading import Semaphore,Thread

def func(index,sem):
sem.acquire()
print(index)
time.sleep(1)
sem.release()

if __name__ == '__main__':
sem = Semaphore(3)
for i in range(30):
Thread(target=func,args=(i,sem)).start()

  

5.事件:检测数据库连接

  wait() 等待事件内的信号变为True

  set()  把信号变为True

  clear() 把信号变为False

  is_set 查看信号是否Ture

import time
import random
from threading import Event,Thread
def check():
    print('开始检测数据库连接')
    time.sleep(random.randint(1,5))  #模拟,真实测试时替换
    e.set() #

def connect(e):
    for i in range(3):
        e.wait(0.5)
        if e.is_set():
            print('数据库连接成功')
        else:
            print('尝试连接数据库%s次失败' % (i+1))
    else:
        raise TimeoutError
e = Event()
Thread(target=connect,args=(e,)).start()
Thread(target=check,args=(e,)).start()

 6.条件

  notify 控制流量, 通知有多少人可以通过了

  wait 在门口等待的所有人

  wait 使用前后都需要加锁

  notify 使用前后都要加锁

from threading import Condition,Thread
def func(con,index):
    print('%s在等待' % index)
    con.acquire()
    con.wait()
    print('%sdo something' % index)
    con.release()
con = Condition()
for i in range(100):
    t = Thread(target=func,args=(con,i))
    t.start()
count = 100
while count > 0:
    num = int(input('>>>:'))
    con.acquire()
    con.notify(num)
    count -= num
    con.release()

# con.acquire()
# con.notify_all() #一次放完
# con.release()

7.定时器:

from threading import Timer
def func():
    print('执行了')
t = Timer(5,func) #5妙后执行func这个函数
t.start()
print('主线程')

8.队列:

#线程是安全 队列 :排队相关的逻辑(先进先出)
import queue
q = queue.Queue()
q.put()
q.get()
q.put_nowait()
q.get_nowait()

  qps 每秒接受的请求数:

  帮助维持程序相应的顺序

后进先出

from queue import LifoQueue  #后进先出
#栈  完成算法
lq = LifoQueue()
lq.put(1)
lq.put(2)
lq.put(3)
lq.put('a')
lq.put('b')
print(lq.get())
print(lq.get())
print(lq.get())

优先级队列:应用VIP.

#优先级队列,
from queue import PriorityQueue
pq = PriorityQueue()
pq.put((15,'abc'))
pq.put((5,'ghj'))
pq.put((25,'ftr'))
pq.put((25,'atr'))
print(pq.get())
print(pq.get())
print(pq.get())
print(pq.get())

  

from queue import PriorityQueue
pq = PriorityQueue()
pq.put(35)
pq.put(15)
pq.put(25)
print(pq.get())
print(pq.get())
print(pq.get())

  

from queue import PriorityQueue
pq = PriorityQueue()
pq.put('c')
pq.put('a')
pq.put('b')
print(pq.get())
print(pq.get())
print(pq.get())

  

  

  

    

    

原文地址:https://www.cnblogs.com/lijinming110/p/9703737.html