python并发编程之协程

---恢复内容开始---

一、join方法

(1)开一个主线程

from threading import Thread,currentThread
import time
def walk():
    print('%s is running'%currentThread().getName())
    time.sleep(2)
    print('%s is done'%currentThread().getName())
if __name__ == '__main__':
    # for i in range(10):
        p=Thread(target=walk)
        p.start()
        p.join()
        print('')  #主线程在等p.join执行
开一个主线程

(2)开多个主线程,并发运行

from threading import Thread,currentThread
import time
def walk():
    print('%s is running'%currentThread().getName())
    time.sleep(2)
    print('%s is done'%currentThread().getName())
if __name__ == '__main__':
    l=[]
    for i in range(10):
        p=Thread(target=walk)
        l.append(p)
        p.start()
    #   p.join()       #在p.start() 跟p.join()就会变为串行,一个一个的运行
    for p in l:
        p.join()
    print('')
开多个主线程

(3)并发运行 互斥锁之锁局部的 ,用所只锁住你对共享数据修改的部分

加锁:

from threading import Thread,currentThread,Lock
import time
n=100
def walk():
  # 并发运行
    time.sleep(2)
    global n
    mutex.acquire()
      # 串行
    temp=n
    time.sleep(0.01)
    n=temp-1    #数据可能同是减1,可能数据会乱
    mutex.release()
if __name__ == '__main__':
    mutex=Lock()
    l=[]
    start = time.time()
    for i in range(100):
        p=Thread(target=walk)
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop = time.time()
    print('n:%s run_time : %s' %(n,stop - start))
View Code

不加锁:

from threading import Thread,currentThread,Lock
import time
n=100
def walk():
    time.sleep(2)
    global n
    # mutex.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1    #数据可能同是减1,可能数据会乱
    # mutex.release()
if __name__ == '__main__':
    mutex=Lock()
    start = time.time()
    for i in range(10):
        p=Thread(target=walk)
        p.start()
        p.join()
    stop = time.time()
    print('n:%s run_time : %s' %(n,stop - start))  #至少21秒
并发运行,不加锁
主线程运行完毕是在所有线程所在的进程内所有非守护线程运行完毕才运行

二、GIL本质是一把互斥锁,将并发转成串行,以此来控制同一时间内共享数据只能被一个任务修改,

     进而保证数据的安全

from threading import Thread,currentThread,Lock
import time
n=100
def work():
    time.sleep(2)
    global n
    time.sleep(0.5)
    mutex.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    mutex.release()
if __name__ == '__main__':
    mutex=Lock()
    t1=Thread(target=work)
    t2=Thread(target=work)
    t3=Thread(target=work)
    t1.start()
    t2.start()
    t3.start()
View Code

三、多线程性能测试

(1)'''
多进程
优点:可以利用多核
缺点:开销大

多线程
优点:开销小
缺点:不可以利用多核
'''
from multiprocessing import Process
from threading import Thread
import time
def work():
    res=0
    for i in range(10000000):
        res+=i

if __name__ == '__main__':
    l=[]
    start=time.time()
    for i in range(4):
        # p=Process(target=work) #0.9260530471801758
        p=Thread(target=work) #0.9260530471801758
        l.append(p)
        p.start()

    for p in l:
        p.join()

    stop=time.time()
    print('%s' %(stop-start))
计算机密集型--开启多进程
from multiprocessing import Process
from threading import Thread
import time
def work():
    time.sleep(2)

if __name__ == '__main__':
    l=[]
    start=time.time()
    for i in range(400):
        p=Process(target=work)
        # p=Thread(target=work)
        l.append(p)
        p.start()

    for p in l:
        p.join()

    stop=time.time()
    print('%s' %(stop-start))
I/O密集型---开启多线程

(2)应用:

             多线程用于IO密集型,如socket,爬虫,web
            多进程用于计算密集型,如金融分析

四、死锁与递归锁

死锁:

from threading import Thread,RLock
import time
mutexA=RLock()
class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()
    def f1(self):
        mutexA.acquire()
        print('%s 拿到A锁'%self.name)
        mutexA.acquire()
        print('%s 拿到B锁' % self.name)
        mutexA.release()
        mutexA.release()
    def f2(self):
        mutexA.acquire()
        print('%s 拿到A锁' % self.name)
        time.sleep(1)
        mutexA.acquire()
        print('%s 拿到B锁' % self.name)
        mutexA.release()
        mutexA.release()
if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
View Code

解决死锁的方法

递归锁:用RLock代替Lock

 1 from threading import Lock,Thread,RLock
 2 import time
 3 # mutexA=Lock()
 4 # mutexB=Lock()
 5 mutexB=mutexA=RLock()
 6 class MyThread(Thread):
 7     def run(self):
 8         self.f1()
 9         self.f2()
10     def f1(self):
11         mutexA.acquire()
12         print('33[32m%s 拿到A锁' %self.name)
13         mutexB.acquire()
14         print('33[45m%s 拿到B锁' %self.name)
15         mutexB.release()
16         mutexA.release()
17     def f2(self):
18         mutexB.acquire()
19         print('33[32m%s 拿到B锁' %self.name)
20         time.sleep(1)
21         mutexA.acquire()
22         print('33[45m%s 拿到A锁' %self.name)
23         mutexA.release()
24         mutexB.release()
25 if __name__ == '__main__':
26     for i in range(10):
27         t=MyThread()
28         t.start()
递归锁

五、信号量

信号量和进程一样

信号量就是一把锁,可以有多把钥匙

from threading import Thread,Semaphore,currentThread
import time,random
sm=Semaphore(5)
def task():
    sm.acquire()
    print('%s 上厕所' %currentThread().getName())
    time.sleep(random.randint(1,3))
    print('%s 走了' %currentThread().getName())
    sm.release()
if __name__ == '__main__':
    for i in range(20):
        t=Thread(target=task)
        t.start()
View Code

六、事件

Event

vent.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。
from threading import Thread,currentThread,Event
import time
e=Event()
def traffic_ligths():
    time.sleep(1)
    e.set()
def car():

    print('33[45m%s等'%currentThread().getName())
    e.wait()
    print('33[43m%s开'%currentThread().getName())
if __name__ == '__main__':
    print('绿灯')
    for i in range(10):
        p=Thread(target=car)
        p.start()
    # print('绿灯')
    time.sleep(5)
    print('红灯')
    traffic_ligth=Thread(target=traffic_ligths)
    traffic_ligth.start()
红绿灯事列

from threading import Thread, currentThread, Event
import time
e = Event()
def conn_mysql():
    count = 1
    while not e.is_set():
        if count > 3:
            raise ConnectionError('尝试链接的次数太多了')
        print('33[45m%s 第%s次尝试' % (currentThread().getName(), count))
        e.wait(timeout=1)
        count += 1
    print('33[45m%s 开始链接' %currentThread().getName())
def check_myql():
    print('33[45m%s 开始检测 my_sql....' %currentThread().getName())
    time.sleep(2)
    e.set()
if __name__ == '__main__':
    for i in range(2):
        p = Thread(target=conn_mysql)
        p.start()
    p = Thread(target=check_myql)
    p.start()
链接——sql

七、定时器

定时器,是n秒后执行操作

rom threading import Timer


def hello(n):
    print("hello, world",n)


t = Timer(3, hello,args=(123,))
t.start()  # after 1 seconds, "hello, world" will be printed
定时器

八、线程queue

queue队列,用法与进程queue一样

q=queue.queue()  先进先出

q=queue.Queue(3) #先进先出
q.put('first')
q.put('second')
q.put('third')
# q.put('fourth')

print(q.get())
print(q.get())
print(q.get())

q=queue.LifoQueue()   先进后出

q=queue.LifoQueue() #先进后出
q.put('first')
q.put('second')
q.put('third')
# q.put('fourth')

print(q.get())
print(q.get())
print(q.get())
put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
import queue
q=queue.PriorityQueue()
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
print(q.get())
print(q.get())
print(q.get())

---恢复内容结束---

原文地址:https://www.cnblogs.com/mengqingjian/p/7463527.html