python入门三十天---锁————-条件变量,同步锁,死锁,信号量

线程threeding:   + join

练习:

 1 #!/usr/bin/env python3
 2 #-*- coding:utf-8 -*-
 3 '''
 4 Administrator 
 5 2018/8/10 
 6 '''
 7 
 8 import threading
 9 from time import time,sleep,asctime,ctime
10 
11 def music(name):
12     for i in range(2):
13         print("......music %s .. %s is doing         %s "%(i,name,ctime()))
14         sleep(2)
15         print(".on mygod.....music %s.. %s is end    %s" %(i,name,ctime()))
16 def move(name):
17     for i in range(2):
18 
19         print("......move%s .. %s is doing           %s   "%(i,name,ctime()))
20         sleep(3)
21         print(".on mygod..%s...move .. %s is end     %s" %(i,name,ctime()) )
22 
23 threads=[]
24 t1=threading.Thread(target=music,args=("七里香",))
25 threads.append(t1)
26 t2=threading.Thread(target=move,args=("我不是药神",))
27 threads.append(t2)
28 
29 if __name__=="__main__":
30     for t in threads:
31         t.start()
32     t.join()
33 
34 
35     print("all is  end.")
 1 import threading
 2 import time
 3  
 4  
 5 class MyThread(threading.Thread):
 6     def __init__(self,num):
 7         threading.Thread.__init__(self)
 8         self.num = num
 9  
10     def run(self):#定义每个线程要运行的函数
11  
12         print("running on number:%s" %self.num)
13  
14         time.sleep(3)
15  
16 if __name__ == '__main__':
17  
18     t1 = MyThread(1)
19     t2 = MyThread(2)
20     t1.start()
21     t2.start()
 1 import threading
 2 from time import ctime,sleep
 3 import time
 4 
 5 def music(func):
 6     for i in range(2):
 7         print ("Begin listening to %s. %s" %(func,ctime()))
 8         sleep(4)
 9         print("end listening %s"%ctime())
10 
11 def move(func):
12     for i in range(2):
13         print ("Begin watching at the %s! %s" %(func,ctime()))
14         sleep(5)
15         print('end watching %s'%ctime())
16 
17 threads = []
18 t1 = threading.Thread(target=music,args=('七里香',))
19 threads.append(t1)
20 t2 = threading.Thread(target=move,args=('阿甘正传',))
21 threads.append(t2)
22 
23 if __name__ == '__main__':
24 
25     for t in threads:
26         # t.setDaemon(True)
27         t.start()
28         # t.join()
29     # t1.join()
30     t2.join()########考虑这三种join位置下的结果?
31     print ("all over %s" %ctime())
View Code

setDaemon(True):

      将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦 

 1 #!/usr/bin/env python3
 2 #-*- coding:utf-8 -*-
 3 '''
 4 Administrator 
 5 2018/8/10 
 6 '''
 7 
 8 import threading
 9 from time import time,sleep,asctime,ctime
10 
11 def music(name):
12     for i in range(2):
13         print("......music %s .. %s is doing         %s "%(i,name,ctime()))
14         sleep(2)
15         print(".on mygod.....music %s.. %s is end    %s" %(i,name,ctime()))
16 def move(name):
17     for i in range(2):
18 
19         print("......move%s .. %s is doing           %s   "%(i,name,ctime()))
20         sleep(3)
21         print(".on mygod..%s...move .. %s is end     %s" %(i,name,ctime()) )
22 
23 threads=[]
24 t1=threading.Thread(target=music,args=("七里香",))
25 threads.append(t1)
26 t2=threading.Thread(target=move,args=("我不是药神",))
27 threads.append(t2)
28 
29 if __name__=="__main__":
30     for t in threads:
31         t.setDaemon(True)#守护线程
32         t.start()
33     # t.join()
34     print("all is  end.                             %s"%ctime())
View Code
 1 #!/usr/bin/env python3
 2 #-*- coding:utf-8 -*-
 3 '''
 4 Administrator 
 5 2018/8/10 
 6 '''
 7 
 8 import threading
 9 from time import time,sleep,asctime,ctime
10 
11 def music(name):
12     for i in range(2):
13         print("......music %s .. %s is doing         %s "%(i,name,ctime()))
14         sleep(2)
15         print(".on mygod.....music %s.. %s is end    %s" %(i,name,ctime()))
16 def move(name):
17     for i in range(2):
18 
19         print("......move%s .. %s is doing           %s   "%(i,name,ctime()))
20         sleep(3)
21         print(".on mygod..%s...move .. %s is end     %s" %(i,name,ctime()) )
22 
23 threads=[]
24 t1=threading.Thread(target=music,args=("七里香",))
25 threads.append(t1)
26 t2=threading.Thread(target=move,args=("我不是药神",))
27 threads.append(t2)
28 
29 if __name__=="__main__":
30     t2.setDaemon(True)  # 守护线程
31     for t in threads:
32         # t.setDaemon(True)#守护线程
33         t.start()
34     # t.join()
35     print("all is  end.                             %s"%ctime())
36 
37 """
38 ......music 0 .. 七里香 is doing         Fri Aug 10 12:21:46 2018 
39 ......move0 .. 我不是药神 is doing           Fri Aug 10 12:21:46 2018   
40 all is  end.                             Fri Aug 10 12:21:46 2018
41 .on mygod.....music 0.. 七里香 is end    Fri Aug 10 12:21:48 2018
42 ......music 1 .. 七里香 is doing         Fri Aug 10 12:21:48 2018 
43 .on mygod..0...move .. 我不是药神 is end     Fri Aug 10 12:21:49 2018
44 ......move1 .. 我不是药神 is doing           Fri Aug 10 12:21:49 2018   
45 .on mygod.....music 1.. 七里香 is end    Fri Aug 10 12:21:50 2018
46 .on mygod..1...move .. 我不是药神 is end     Fri Aug 10 12:21:52 2018
47 
48 -----------------------t2.setDaemon(True)----------------------------------------------
49 ......music 0 .. 七里香 is doing         Fri Aug 10 12:20:30 2018 
50 ......move0 .. 我不是药神 is doing           Fri Aug 10 12:20:30 2018   
51 all is  end.                             Fri Aug 10 12:20:30 2018
52 .on mygod.....music 0.. 七里香 is end    Fri Aug 10 12:20:32 2018
53 ......music 1 .. 七里香 is doing         Fri Aug 10 12:20:32 2018 
54 .on mygod..0...move .. 我不是药神 is end     Fri Aug 10 12:20:33 2018
55 ......move1 .. 我不是药神 is doing           Fri Aug 10 12:20:33 2018   
56 .on mygod.....music 1.. 七里香 is end    Fri Aug 10 12:20:34 2018     ####   t1结束后, 程序就立即结束
57 
58 """
守护进程

join():

       在子线程完成运行之前,这个子线程的父线程将一直被阻塞。

其它方法

 1 thread 模块提供的其他方法:
 2 # threading.currentThread(): 返回当前的线程变量。
 3 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
 4 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
 5 # 除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
 6 # run(): 用以表示线程活动的方法。
 7 # start():启动线程活动。
 8 # join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
 9 # isAlive(): 返回线程是否活动的。
10 # getName(): 返回线程名。
11 # setName(): 设置线程名。


2018-08-1013:17:35

  • GIL 导致python没有办法真正利用系统的多核CUP


同步锁

ThreadLocal

 1 import time,threading
 2 
 3 
 4 
 5 
 6 num=100
 7 def addNum():
 8     global num
 9     # num-=1
10     temp=num
11     print("ok",num)
12     num=temp-1
13 
14  
15 thread_list=[]
16 for i in range(100):
17     t=threading.Thread(target=addNum)
18     t.start()
19     thread_list.append(t)
20     
21 for t in thread_list:
22     t.join()
23     
24     
25 print("final results",num)
 1 import time,threading
 2 
 3 start=time.time()
 4 num=100
 5 r=threading.Lock()
 6 
 7 def addNum():
 8     global num
 9     # num-=1
10     r.acquire()  #加锁
11     temp=num
12     time.sleep(0.1)
13     num=temp-1
14     r.release()  #解锁
15 
16 
17 thread_list=[]
18 for i in range(100):
19     t=threading.Thread(target=addNum)
20     t.start()
21     thread_list.append(t)
22     
23 for t in thread_list:
24     t.join()
25     
26     
27 print("final results",num)
28 end=time.time()
29 print(end-start)

在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。

但是局部变量也有问题,就是在函数调用的时候,传递起来很麻烦:

 1 def process_student(name):
 2     std = Student(name)
 3     # std是局部变量,但是每个函数都要用它,因此必须传进去:
 4     do_task_1(std)
 5     do_task_2(std)
 6 
 7 def do_task_1(std):
 8     do_subtask_1(std)
 9     do_subtask_2(std)
10 
11 def do_task_2(std):
12     do_subtask_2(std)
13     do_subtask_2(std)

每个函数一层一层调用都这么传参数那还得了?用全局变量?也不行,因为每个线程处理不同的Student对象,不能共享。

如果用一个全局dict存放所有的Student对象,然后以thread自身作为key获得线程对应的Student对象如何?

 1 global_dict = {}
 2 
 3 def std_thread(name):
 4     std = Student(name)
 5     # 把std放到全局变量global_dict中:
 6     global_dict[threading.current_thread()] = std
 7     do_task_1()
 8     do_task_2()
 9 
10 def do_task_1():
11     # 不传入std,而是根据当前线程查找:
12     std = global_dict[threading.current_thread()]
13     ...
14 
15 def do_task_2():
16     # 任何函数都可以查找出当前线程的std变量:
17     std = global_dict[threading.current_thread()]

    ...

这种方式理论上是可行的,它最大的优点是消除了std对象在每层函数中的传递问题,但是,每个函数获取std的代码有点丑。

有没有更简单的方式?

ThreadLocal应运而生,不用查找dictThreadLocal帮你自动做这件事:

 1 import threading
 2 
 3 # 创建全局ThreadLocal对象:
 4 local_school = threading.local()
 5 
 6 def process_student():
 7     # 获取当前线程关联的student:
 8     std = local_school.student
 9     print('Hello, %s (in %s)' % (std, threading.current_thread().name))
10 
11 def process_thread(name):
12     # 绑定ThreadLocal的student:
13     local_school.student = name
14     process_student()
15 
16 t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
17 t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
18 t1.start()
19 t2.start()
20 t1.join()
21 t2.join()

执行结果:

Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。你可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。

可以理解为全局变量local_school是一个dict,不但可以用local_school.student,还可以绑定其他变量,如local_school.teacher等等。

ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。

小结

一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。

 1 import time
 2 import threading
 3 
 4 def addNum():
 5     global num #在每个线程中都获取这个全局变量
 6     # num-=1
 7 
 8     temp=num
 9     print('--get num:',num )
10     #time.sleep(0.1)
11     num =temp-1 #对此公共变量进行-1操作
12 
13 
14 num = 100  #设定一个共享变量
15 thread_list = []
16 for i in range(100):
17     t = threading.Thread(target=addNum)
18     t.start()
19     thread_list.append(t)
20 
21 for t in thread_list: #等待所有线程执行完毕
22     t.join()
23 
24 print('final num:', num )

注意:

1:  why num-=1没问题呢?这是因为动作太快(完成这个动作在切换的时间内)

2: if sleep(1),现象会更明显,100个线程每一个一定都没有执行完就进行了切换,我们说过sleep就等效于IO阻塞,1s之内不会再切换回来,所以最后的结果一定是99.

 

多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢?

有同学会想用join呗,但join会把整个线程给停住,造成了串行,失去了多线程的意义,而我们只需要把计算(涉及到操作公共数据)的时候串行执行。

我们可以通过同步锁来解决这种问题

 1 import time
 2 import threading
 3 
 4 def addNum():
 5     global num #在每个线程中都获取这个全局变量
 6     # num-=1
 7     lock.acquire()
 8     temp=num
 9     print('--get num:',num )
10     #time.sleep(0.1)
11     num =temp-1 #对此公共变量进行-1操作
12     lock.release()
13 
14 num = 100  #设定一个共享变量
15 thread_list = []
16 lock=threading.Lock()
17 
18 for i in range(100):
19     t = threading.Thread(target=addNum)
20     t.start()
21     thread_list.append(t)
22 
23 for t in thread_list: #等待所有线程执行完毕
24     t.join()
25 
26 print('final num:', num )

问题解决,但

请问:同步锁与GIL的关系?

Python的线程在GIL的控制之下,线程之间,对整个python解释器,对python提供的C API的访问都是互斥的,这可以看作是Python内核级的互斥机制。但是这种互斥是我们不能控制的,我们还需要另外一种可控的互斥机制———用户级互斥。内核级通过互斥保护了内核的共享资源,同样,用户级互斥保护了用户程序中的共享资源。

GIL 的作用是:对于一个解释器,只能有一个thread在执行bytecode。所以每时每刻只有一条bytecode在被执行一个thread。GIL保证了bytecode 这层面上是thread safe的。
但是如果你有个操作比如 x += 1,这个操作需要多个bytecodes操作,在执行这个操作的多条bytecodes期间的时候可能中途就换thread了,这样就出现了data races的情况了。
 
那我的同步锁也是保证同一时刻只有一个线程被执行,是不是没有GIL也可以?是的;那要GIL有什么鸟用?你没治;

线程死锁和递归锁

      在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。下面是一个死锁的例子:

 1 import threading,time
 2 
 3 class myThread(threading.Thread):
 4     def doA(self):
 5         lockA.acquire()
 6         print(self.name,"gotlockA",time.ctime())
 7         time.sleep(3)
 8         lockB.acquire()
 9         print(self.name,"gotlockB",time.ctime())
10         lockB.release()
11         lockA.release()
12 
13     def doB(self):
14         lockB.acquire()
15         print(self.name,"gotlockB",time.ctime())
16         time.sleep(2)
17         lockA.acquire()
18         print(self.name,"gotlockA",time.ctime())
19         lockA.release()
20         lockB.release()
21     def run(self):
22         self.doA()
23         self.doB()
24 if __name__=="__main__":
25 
26     lockA=threading.Lock()
27     lockB=threading.Lock()
28     threads=[]
29     for i in range(5):
30         threads.append(myThread())
31     for t in threads:
32         t.start()
33     for t in threads:
34         t.join()#等待线程结束,后面再讲。

解决办法:使用递归锁,将

lockA=threading.Lock()
lockB=threading.Lock()<br data-filtered="filtered">#--------------<br data-filtered="filtered">lock=threading.RLock()

  

为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

应用

 1 import time
 2 
 3 import threading
 4 
 5 class Account:
 6     def __init__(self, _id, balance):
 7         self.id = _id
 8         self.balance = balance
 9         self.lock = threading.RLock()
10 
11     def withdraw(self, amount):
12 
13         with self.lock:
14             self.balance -= amount
15 
16     def deposit(self, amount):
17         with self.lock:
18             self.balance += amount
19 
20 
21     def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的场景
22 
23         with self.lock:
24             interest=0.05
25             count=amount+amount*interest
26 
27             self.withdraw(count)
28 
29 
30 def transfer(_from, to, amount):
31 
32     #锁不可以加在这里 因为其他的其它线程执行的其它方法在不加锁的情况下数据同样是不安全的
33      _from.withdraw(amount)
34 
35      to.deposit(amount)
36 
37 
38 
39 alex = Account('alex',1000)
40 yuan = Account('yuan',1000)
41 
42 t1=threading.Thread(target = transfer, args = (alex,yuan, 100))
43 t1.start()
44 
45 t2=threading.Thread(target = transfer, args = (yuan,alex, 200))
46 t2.start()
47 
48 t1.join()
49 t2.join()
50 
51 print('>>>',alex.balance)
52 print('>>>',yuan.balance)
View Code



 死锁现象及其解决方案

 1 import threading
 2 
 3 class Account:
 4     def __init__(self,name,money,r):
 5         self.name=name
 6         self.balance=money
 7         
 8         
 9         
10     def withdraw(self,num):
11         r.acquire()
12         self.balance-=num
13         r.release()
14     def repay(self,num):
15         r.acquire()
16         self.balance+=num
17         r.release()
18     def abc(self,num):
19         r.acquire()
20         #当出现以下这种情况的时候,就可能会出现死锁现象。此时就需要使用 这个解决方案
21         self.withdraw()
22         self.balance+=num
23         r.release()
24 def transer(_from,to,count):
25     
26     # r.acquire()
27     _from.withdraw(count)
28     to.repay(count)
29     # r.release() 
30 r=threading.RLock()
31 a1=Account("alex",1000,r)
32 a2=Account("xiaohu",2000,r)
33 print("%s账户余额:%s"%(a1.name,a1.balance))
34 
35 
36 print("%s账户余额:%s"%(a2.name,a2.balance))
37 
38 t1=threading.Thread(target=transer,args=(a1,a2,100))
39 t2=threading.Thread(target=transer,args=(a2,a1,400))
40 
41 t1.start()
42 t2.start()
43 t1.join()
44 t2.join()
45 print("%s账户余额:%s"%(a1.name,a1.balance))
46 
47 
48 print("%s账户余额:%s"%(a2.name,a2.balance))

结果:

条件变量同步(Condition)

      有一类线程需要满足条件之后才能够继续执行,Python提供了threading.Condition 对象用于条件变量线程的支持,它除了能提供RLock()或Lock()的方法外,还提供了 wait()、notify()、notifyAll()方法。

      lock_con=threading.Condition([Lock/Rlock]): 锁是可选选项,不传人锁,对象自动创建一个RLock()。

wait():条件不满足时调用,线程会释放锁并进入等待阻塞;
notify():条件创造后调用,通知等待池激活一个线程;
notifyAll():条件创造后调用,通知等待池激活所有线程。
 1 import threading,time
 2 from random import randint
 3 class Producer(threading.Thread):
 4     def run(self):
 5         global L
 6         while True:
 7             val=randint(0,100)
 8             print('生产者',self.name,":Append"+str(val),L)
 9             if lock_con.acquire():
10                 L.append(val)
11                 lock_con.notify()
12                 lock_con.release()
13             time.sleep(3)
14 class Consumer(threading.Thread):
15     def run(self):
16         global L
17         while True:
18                 lock_con.acquire()
19                 if len(L)==0:
20                     lock_con.wait()
21                 print('消费者',self.name,":Delete"+str(L[0]),L)
22                 del L[0]
23                 lock_con.release()
24                 time.sleep(0.25)
25 
26 if __name__=="__main__":
27 
28     L=[]
29     lock_con=threading.Condition()
30     threads=[]
31     for i in range(5):
32         threads.append(Producer())
33     threads.append(Consumer())
34     for t in threads:
35         t.start()
36     for t in threads:
37         t.join()
View Code

同步条件(Event)

      条件同步和条件变量同步差不多意思,只是少了锁功能,因为条件同步设计于不访问共享资源的条件环境。event=threading.Event():条件环境对象,初始值 为False;

event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。

实例1:

 1 import threading,time
 2 class Boss(threading.Thread):
 3     def run(self):
 4         print("BOSS:今晚大家都要加班到22:00。")
 5         event.isSet() or event.set()
 6         time.sleep(5)
 7         print("BOSS:<22:00>可以下班了。")
 8         event.isSet() or event.set()
 9 class Worker(threading.Thread):
10     def run(self):
11         event.wait()
12         print("Worker:哎……命苦啊!")
13         time.sleep(0.25)
14         event.clear()
15         event.wait()
16         print("Worker:OhYeah!")
17 if __name__=="__main__":
18     event=threading.Event()
19     threads=[]
20     for i in range(5):
21         threads.append(Worker())
22     threads.append(Boss())
23     for t in threads:
24         t.start()
25     for t in threads:
26         t.join()
View Code

实例2:

 1 import threading,time
 2 import random
 3 def light():
 4     if not event.isSet():
 5         event.set() #wait就不阻塞 #绿灯状态
 6     count = 0
 7     while True:
 8         if count < 10:
 9             print('33[42;1m--green light on---33[0m')
10         elif count <13:
11             print('33[43;1m--yellow light on---33[0m')
12         elif count <20:
13             if event.isSet():
14                 event.clear()
15             print('33[41;1m--red light on---33[0m')
16         else:
17             count = 0
18             event.set() #打开绿灯
19         time.sleep(1)
20         count +=1
21 def car(n):
22     while 1:
23         time.sleep(random.randrange(10))
24         if  event.isSet(): #绿灯
25             print("car [%s] is running.." % n)
26         else:
27             print("car [%s] is waiting for the red light.." %n)
28 if __name__ == '__main__':
29     event = threading.Event()
30     Light = threading.Thread(target=light)
31     Light.start()
32     for i in range(3):
33         t = threading.Thread(target=car,args=(i,))
34         t.start()
View Code

信号量(Semaphore)

      信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。

      计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)

      BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

实例:

 1 import threading,time
 2 class myThread(threading.Thread):
 3     def run(self):
 4         if semaphore.acquire():
 5             print(self.name)
 6             time.sleep(5)
 7             semaphore.release()
 8 if __name__=="__main__":
 9     semaphore=threading.Semaphore(5)
10     thrs=[]
11     for i in range(100):
12         thrs.append(myThread())
13     for t in thrs:
14         t.start()
View Code

 如果在主机执行IO密集型任务的时候再执行这种类型的程序时,计算机就有很大可能会宕机。
这时候就可以为这段程序添加一个计数器功能,来限制一个时间点内的线程数量。

 1 import threading,time
 2 class myThread(threading.Thread):
 3     def run(self):
 4         if semaphore.acquire():
 5             print(self.name)
 6             time.sleep(5)
 7             semaphore.release()
 8 if __name__=="__main__":
 9     semaphore=threading.BoundedSemaphore(5)
10     #信号量 设置只能5个线程使用,其余的排队吧
11     thrs=[]
12     for i in range(100):
13         thrs.append(myThread())
14     for t in thrs:
15         t.start()
View Code


多线程利器(queue) 队列   ####

1 创建一个“队列”对象import Queueq = Queue.Queue(maxsize = 10)Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。
2 
3 将一个值放入队列中q.put(10)调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。
4 
5 将一个值从队列中取出q.get()调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
6 
7 Python Queue模块有三种队列及构造函数:1、Python Queue模块的FIFO队列先进先出。  class queue.Queue(maxsize)2、LIFO类似于堆,即先进后出。             class queue.LifoQueue(maxsize)3、还有一种是优先级队列级别越低越先出来。   class queue.PriorityQueue(maxsize)
8 
9 此包中的常用方法(q = Queue.Queue()):q.qsize() 返回队列的大小q.empty() 如果队列为空,返回True,反之Falseq.full() 如果队列满了,返回True,反之Falseq.full 与 maxsize 大小对应q.get([block[, timeout]]) 获取队列,timeout等待时间q.get_nowait() 相当q.get(False)非阻塞 q.put(item) 写入队列,timeout等待时间q.put_nowait(item) 相当q.put(item, False)q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号q.join() 实际上意味着等到队列为空,再执行别的操作
 1 import threading,queue
 2 from time import sleep
 3 from random import randint
 4 class Production(threading.Thread):
 5     def run(self):
 6         while True:
 7             r=randint(0,100)
 8             q.put(r)
 9             print("生产出来%s号包子"%r)
10             sleep(1)
11 class Proces(threading.Thread):
12     def run(self):
13         while True:
14             re=q.get()
15             print("吃掉%s号包子"%re)
16 if __name__=="__main__":
17     q=queue.Queue(10)
18     threads=[Production(),Production(),Production(),Proces()]
19     for t in threads:
20         t.start()
1
 1 import time,random
 2 import queue,threading
 3 q = queue.Queue()
 4 def Producer(name):
 5   count = 0
 6   while count <20:
 7     time.sleep(random.randrange(3))
 8     q.put(count)
 9     print('Producer %s has produced %s baozi..' %(name, count))
10     count +=1
11 def Consumer(name):
12   count = 0
13   while count <20:
14     time.sleep(random.randrange(4))
15     if not q.empty():
16         data = q.get()
17         print(data)
18         print('33[32;1mConsumer %s has eat %s baozi...33[0m' %(name, data))
19     else:
20         print("-----no baozi anymore----")
21     count +=1
22 p1 = threading.Thread(target=Producer, args=('A',))
23 c1 = threading.Thread(target=Consumer, args=('B',))
24 p1.start()
25 c1.start()
2
 1 #实现一个线程不断生成一个随机数到一个队列中(考虑使用Queue这个模块)
 2 # 实现一个线程从上面的队列里面不断的取出奇数
 3 # 实现另外一个线程从上面的队列里面不断取出偶数
 4 
 5 import random,threading,time
 6 from queue import Queue
 7 #Producer thread
 8 class Producer(threading.Thread):
 9   def __init__(self, t_name, queue):
10     threading.Thread.__init__(self,name=t_name)
11     self.data=queue
12   def run(self):
13     for i in range(10):  #随机产生10个数字 ,可以修改为任意大小
14       randomnum=random.randint(1,99)
15       print ("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum))
16       self.data.put(randomnum) #将数据依次存入队列
17       time.sleep(1)
18     print ("%s: %s finished!" %(time.ctime(), self.getName()))
19 
20 #Consumer thread
21 class Consumer_even(threading.Thread):
22   def __init__(self,t_name,queue):
23     threading.Thread.__init__(self,name=t_name)
24     self.data=queue
25   def run(self):
26     while 1:
27       try:
28         val_even = self.data.get(1,5) #get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒
29         if val_even%2==0:
30           print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even))
31           time.sleep(2)
32         else:
33           self.data.put(val_even)
34           time.sleep(2)
35       except:   #等待输入,超过5秒 就报异常
36         print ("%s: %s finished!" %(time.ctime(),self.getName()))
37         break
38 class Consumer_odd(threading.Thread):
39   def __init__(self,t_name,queue):
40     threading.Thread.__init__(self, name=t_name)
41     self.data=queue
42   def run(self):
43     while 1:
44       try:
45         val_odd = self.data.get(1,5)
46         if val_odd%2!=0:
47           print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd))
48           time.sleep(2)
49         else:
50           self.data.put(val_odd)
51           time.sleep(2)
52       except:
53         print ("%s: %s finished!" % (time.ctime(), self.getName()))
54         break
55 #Main thread
56 def main():
57   queue = Queue()
58   producer = Producer('Pro.', queue)
59   consumer_even = Consumer_even('Con_even.', queue)
60   consumer_odd = Consumer_odd('Con_odd.',queue)
61   producer.start()
62   consumer_even.start()
63   consumer_odd.start()
64   producer.join()
65   consumer_even.join()
66   consumer_odd.join()
67   print ('All threads terminate!')
68 
69 if __name__ == '__main__':
70   main()
3

注意:列表是线程不安全的

 1 import threading,time
 2 
 3 li=[1,2,3,4,5]
 4 
 5 def pri():
 6     while li:
 7         a=li[-1]
 8         print(a)
 9         time.sleep(1)
10         try:
11             li.remove(a)
12         except:
13             print('----',a)
14 
15 t1=threading.Thread(target=pri,args=())
16 t1.start()
17 t2=threading.Thread(target=pri,args=())
18 t2.start()
View Code
原文地址:https://www.cnblogs.com/Mengchangxin/p/9454285.html