Python(多线程threading模块)

day27

参考:http://www.cnblogs.com/yuanchenqi/articles/5733873.html

CPU像一本书,你不阅读的时候,你室友马上阅读,你准备阅读的时候,你室友记下他当时页码,等下次你不读的时候开始读。

多个线程竞争执行。

进程:A process can have one or many threads.一个进程有多个线程。

一个线程就是一堆指令集合。

线程和进程是同样的东西。

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。

 1 import time
 2 import threading
 3 
 4 begin = time.time()
 5 def foo(n):
 6     print('foo%s'%n)
 7     time.sleep(1)
 8 
 9 def bar(n):
10     print('bar%s'%n)
11     time.sleep(2)
12 
13 
14 # foo()
15 # bar()
16 # end = time.time()
17 
18 #并发,两个线程竞争执行
19 t1 = threading.Thread(target = foo, args =(1,) )
20 t2 = threading.Thread(target = bar, args =(2,) )
21 t1.start()
22 t2.start()
23 
24 t1.join()#t1,t2执行完再往下执行
25 t2.join()
26 #t1,t2同时执行
27 end = time.time()
28 
29 
30 print(end - begin)
并发,两个线程竞争执行

执行结果:

foo1
bar2
2.002244710922241

Process finished with exit code 0

IO密集型任务函数(以上为IO密集型)计算效率会被提高,可用多线程

计算密集型任务函数(以下为计算密集型)改成C语言

 1 import time
 2 import threading
 3 begin = time.time()
 4 
 5 def add(n):
 6     sum = 0
 7     for i in range(n):
 8         sum += i
 9     print(sum)
10 
11 # add(50000000)
12 # add(80000000)
13 
14 #并发,两个线程竞争执行
15 t1 = threading.Thread(target = add, args =(50000000,) )
16 t2 = threading.Thread(target = add, args =(80000000,) )
17 t1.start()
18 t2.start()
19 
20 t1.join()#t1,t2执行完再往下执行
21 t2.join()
22 end = time.time()
23 
24 print(end - begin)

计算密集型中用并发计算效率并没有提高。

计算效率并没有提高。

GIL

In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once.

在同一时刻,只能有一个线程。

使之有多个进程就可以解决(如果三个线程无法同时进行,那么把它们分到三个进程里面去,用于解决GIL问题,实现并发)。

 线程与进程的区别:

  1. Threads share the address space of the process that created it; processes have their own address space.
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  4. New threads are easily created; new processes require duplication of the parent process.
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

threading_test.py

 1 import threading
 2 from time import ctime,sleep
 3 import time
 4 
 5 def music(func):
 6     for i in range(2):
 7         print ("Begin listening to %s. %s" %(func,ctime()))
 8         sleep(1)
 9         print("end listening %s"%ctime())
10 
11 def move(func):
12     for i in range(2):
13         print ("Begin watching at the %s! %s" %(func,ctime()))
14         sleep(5)
15         print('end watching %s'%ctime())
16 
17 threads = []
18 t1 = threading.Thread(target=music,args=('七里香',))
19 threads.append(t1)
20 t2 = threading.Thread(target=move,args=('阿甘正传',))
21 threads.append(t2)
22 
23 if __name__ == '__main__':
24 
25     for t in threads:#线程加到了列表中
26         # t.setDaemon(True)
27         t.start()
28         # t.join()
29     # t1.join()
30     #t2.join()########考虑这三种join位置下的结果?
31     print ("all over %s" %ctime())
32 
33 #一共执行10秒

一共只执行10秒,因为是同时执行,看哪个时间长。2*5s

执行结果:

Begin listening to 七里香. Fri Nov  2 16:43:09 2018
all over Fri Nov  2 16:43:09 2018
Begin watching at the 阿甘正传! Fri Nov  2 16:43:09 2018
end listening Fri Nov  2 16:43:10 2018
Begin listening to 七里香. Fri Nov  2 16:43:10 2018
end listening Fri Nov  2 16:43:11 2018
end watching Fri Nov  2 16:43:14 2018
Begin watching at the 阿甘正传! Fri Nov  2 16:43:14 2018
end watching Fri Nov  2 16:43:20 2018

Process finished with exit code 0

join

 1 import threading
 2 from time import ctime,sleep
 3 import time
 4 
 5 def music(func):
 6     for i in range(2):
 7         print ("Begin listening to %s. %s" %(func,ctime()))
 8         sleep(2)
 9         print("end listening %s"%ctime())
10 
11 def move(func):
12     for i in range(2):
13         print ("Begin watching at the %s! %s" %(func,ctime()))
14         sleep(3)
15         print('end watching %s'%ctime())
16 
17 threads = []
18 t1 = threading.Thread(target=music,args=('七里香',))
19 threads.append(t1)
20 t2 = threading.Thread(target=move,args=('阿甘正传',))
21 threads.append(t2)
22 
23 if __name__ == '__main__':
24 
25     for t in threads:#线程加到了列表中
26         # t.setDaemon(True)
27         t.start()
28         #t.join() #变成了串行  t1已经执行完了,但是t2阻塞了,其中t为t2
29     t1.join() #all over在第四秒就会被打印,因为t1四秒执行完,不再阻塞,而t2还在执行
30     #t2.join()########考虑这三种join位置下的结果?
31     print ("all over %s" %ctime())
32 
33 #一共执行6秒

t1.join,t1执行完才能到下一步,所以4秒后才能print ("all over %s" %ctime())

t2.join,t2执行结束才能到下一步,所以6秒后才能print ("all over %s" %ctime())

如果将t.join()放到for循环中,即和串行一样先执行t1,再执行t2。

 

setDeamon

 1 import threading
 2 from time import ctime,sleep
 3 import time
 4 
 5 def music(func):
 6     for i in range(2):
 7         print ("Begin listening to %s. %s" %(func,ctime()))
 8         sleep(2)
 9         print("end listening %s"%ctime())
10 
11 def move(func):
12     for i in range(2):
13         print ("Begin watching at the %s! %s" %(func,ctime()))
14         sleep(3)
15         print('end watching %s'%ctime())
16 
17 threads = []
18 t1 = threading.Thread(target=music,args=('七里香',))
19 threads.append(t1)
20 t2 = threading.Thread(target=move,args=('阿甘正传',))
21 threads.append(t2)
22 
23 if __name__ == '__main__':
24 
25     t2.setDaemon(True)
26     for t in threads:#线程加到了列表中
27         #t.setDaemon(True)
28         t.start()
29 
30     print ("all over %s" %ctime())
31 
32 #主线程只会等待没设定的子线程t1,t2被设定setDaemon
33 #t1已经执行完(4s),但是t2还没执行完,和主线程一起退出
32 #主线程只会等待没设定的子线程t1,t2被设定setDaemon
33 #t1已经执行完(4s),但是t2还没执行完,和主线程一起退出

执行结果:
Begin listening to 七里香. Fri Nov  2 17:33:29 2018
Begin watching at the 阿甘正传! Fri Nov  2 17:33:29 2018
all over Fri Nov  2 17:33:29 2018
end listening Fri Nov  2 17:33:31 2018
Begin listening to 七里香. Fri Nov  2 17:33:31 2018
end watching Fri Nov  2 17:33:32 2018
Begin watching at the 阿甘正传! Fri Nov  2 17:33:32 2018
end listening Fri Nov  2 17:33:33 2018

Process finished with exit code 0

4秒就结束。

 

 print属于主线程!


继承式调用
 1 import threading
 2 import time
 3 
 4 
 5 class MyThread(threading.Thread):#继承
 6     def __init__(self, num):
 7         threading.Thread.__init__(self)
 8         self.num = num
 9 
10     def run(self):  # 定义每个线程要运行的函数
11 
12         print("running on number:%s" % self.num)
13 
14         time.sleep(3)
15 if __name__ == '__main__':
16     t1 = MyThread(1)
17     t2 = MyThread(2)
18     t1.start()
19     t2.start()

同步锁

 1 import time
 2 import threading
 3 
 4 def addNum():
 5     global num #在每个线程中都获取这个全局变量
 6 
 7     temp = num
 8 
 9     time.sleep(0.0001)#在前一次还没执行完,就开始减1
10     num =temp-1 #对此公共变量进行-1操作
14 
15 num = 100  #设定一个共享变量
16 thread_list = []
17 r = threading.Lock()#同步锁
18 for i in range(100):
19 
20     t = threading.Thread(target=addNum)
21     t.start()
22     thread_list.append(t)
23 
24 for t in thread_list: #等待所有线程执行完毕
25     t.join()
26 
27 print('final num:', num )#有join所有执行完再输出

执行结果:

final num: 47

Process finished with exit code 0

最终结果不是0的原因:由于有sleep的原因,100个减一操作几乎同时进行,前一次还在sleep没进行减法运算,全局变量就被后一次线程进行减法运算。

正常情况:100-1=99,99-1=98........1-1 = 0。

有sleep:100-1=99(还没减),全局变量100被拿走,进行下一线程的运算100-1=99,造成最后结果不为0;

解决方法:同步锁,使数据运算部分变成了串行。

 1 import time
 2 import threading
 3 
 4 def addNum():
 5     global num #在每个线程中都获取这个全局变量
 6     #num -= 1
 7 
 8     r.acquire()#同步锁,又变成串行
 9     temp = num
10     #print('--get num:',num )
11     time.sleep(0.0001)#在前一次还没执行完,就开始减1
12     num =temp-1 #对此公共变量进行-1操作
13     r.release()
14     #只是将以上的部分变成了串行
15 
16     print('ok')
17     #将不是数据的部分内容不放到锁中,100个线程同时拿到ok,这部分将不是串行,而是并发
18 
19 
20 num = 100  #设定一个共享变量
21 thread_list = []
22 r = threading.Lock()#同步锁
23 for i in range(100):
24 
25     t = threading.Thread(target=addNum)
26     t.start()
27     thread_list.append(t)
28 
29 for t in thread_list: #等待所有线程执行完毕
30     t.join()
31 
32 print('final num:', num )#有join所有执

锁中的部分变成了串行,只有运行结束才进入下一线程。

但是锁外面的部分print('ok')还是并发的,100个线程同时拿到ok。

死锁和递归锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。

 1 import threading,time
 2 
 3 class myThread(threading.Thread):
 4     def doA(self):
 5         lockA.acquire()
 6         print(self.name,"gotlockA",time.ctime())
 7         time.sleep(3)#以上部分被A锁住
 8 
 9         lockB.acquire()#下面的也锁住
10         print(self.name,"gotlockB",time.ctime())
11 
12         lockB.release()#释放后,执行doB
13         lockA.release()
14 
15     def doB(self):
16         #在此过程中,第二个线程进入,因为A,B已经被释放
17         lockB.acquire()#有锁,正常输出,由于第二个进入,所以A(第二个线程),B(第一个线程)几乎同时获取
18         #但是之后第一个线程想要获取A锁的时候,A锁已经被第二个线程占着,造成死锁.
19         print(self.name,"gotlockB",time.ctime())
20         time.sleep(2)
21 
22         lockA.acquire()
23         print(self.name,"gotlockA",time.ctime())#没有被打印,反而第二个线程的被打印了
24 
25         lockA.release()
26         lockB.release()
27 
28     def run(self):
29         self.doA()
30         self.doB()
31 
32 if __name__=="__main__":
33 
34     lockA=threading.Lock()#两个锁
35     lockB=threading.Lock()
36 
37     #lock = threading.RLock()#该锁可以多次获取,多次acquire和release
38     threads=[]
39     for i in range(5):#5个线程
40         threads.append(myThread())
41     for t in threads:
42         t.start()
43     for t in threads:
44         t.join()#等待线程结束,后面再讲。

执行结果:

Thread-1 gotlockA Sat Nov  3 13:40:48 2018
Thread-1 gotlockB Sat Nov  3 13:40:51 2018
Thread-1 gotlockB Sat Nov  3 13:40:51 2018
Thread-2 gotlockA Sat Nov  3 13:40:51 2018

以上程序卡住不能运行,doA运行完锁A锁B都释放,准备运行doB,休眠2秒后,获取锁A,此时由于线程锁都被释放,可以进入其他线程,如进入线程二,同时也获取锁A,两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。

解决方法:

 1 import threading,time
 2 
 3 class myThread(threading.Thread):
 4     def doA(self):
 5         lock.acquire()
 6         print(self.name,"gotlockA",time.ctime())
 7         time.sleep(3)#以上部分被A锁住
 8 
 9         lock.acquire()#下面的也锁住
10         print(self.name,"gotlockB",time.ctime())
11 
12         lock.release()#释放后,执行doB
13         lock.release()
14 
15     def doB(self):
16         #在此过程中,第二个线程进入,因为A,B已经被释放
17         lock.acquire()#有锁,正常输出,由于第二个进入,所以A(第二个线程),B(第一个线程)几乎同时获取
18         #但是之后第一个线程想要获取A锁的时候,A锁已经被第二个线程占着,造成死锁.
19         print(self.name,"gotlockB",time.ctime())
20         time.sleep(2)
21 
22         lock.acquire()
23         print(self.name,"gotlockA",time.ctime())#没有被打印,反而第二个线程的被打印了
24 
25         lock.release()
26         lock.release()
27 
28     def run(self):
29         self.doA()
30         self.doB()
31 
32 if __name__=="__main__":
33 
34     # lockA=threading.Lock()#两个锁
35     # lockB=threading.Lock()
36 
37     lock = threading.RLock()#该锁可以多次获取,多次acquire和release
38     threads=[]
39     for i in range(5):#5个线程
40         threads.append(myThread())
41     for t in threads:
42         t.start()
43     for t in threads:
44         t.join()#等待线程结束,后

lock = threading.RLock(),该锁可以重用,只用lock。不用lockA,lockB。

执行结果:

Thread-1 gotlockA Sat Nov  3 13:48:04 2018
Thread-1 gotlockB Sat Nov  3 13:48:07 2018
Thread-1 gotlockB Sat Nov  3 13:48:07 2018
Thread-1 gotlockA Sat Nov  3 13:48:09 2018
Thread-3 gotlockA Sat Nov  3 13:48:09 2018
Thread-3 gotlockB Sat Nov  3 13:48:12 2018
Thread-4 gotlockA Sat Nov  3 13:48:12 2018
Thread-4 gotlockB Sat Nov  3 13:48:15 2018
Thread-4 gotlockB Sat Nov  3 13:48:15 2018
Thread-4 gotlockA Sat Nov  3 13:48:17 2018
Thread-2 gotlockA Sat Nov  3 13:48:17 2018
Thread-2 gotlockB Sat Nov  3 13:48:20 2018
Thread-2 gotlockB Sat Nov  3 13:48:20 2018
Thread-2 gotlockA Sat Nov  3 13:48:22 2018
Thread-5 gotlockA Sat Nov  3 13:48:22 2018
Thread-5 gotlockB Sat Nov  3 13:48:25 2018
Thread-3 gotlockB Sat Nov  3 13:48:25 2018
Thread-3 gotlockA Sat Nov  3 13:48:27 2018
Thread-5 gotlockB Sat Nov  3 13:48:27 2018
Thread-5 gotlockA Sat Nov  3 13:48:29 2018

信号量(Semaphore)

信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。

计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)

BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

 1 import threading,time
 2 class myThread(threading.Thread):
 3     def run(self):
 4         if semaphore.acquire():
 5             print(self.name)
 6             time.sleep(3)
 7             semaphore.release()
 8 if __name__=="__main__":
 9     semaphore=threading.Semaphore(5)
10     thrs=[]
11     for i in range(23):
12         thrs.append(myThread())
13     for t in thrs:
14         t.start()

 执行结果:

Thread-1
Thread-2
Thread-3
Thread-4
Thread-5
Thread-6
Thread-7
Thread-8
Thread-9
Thread-10
Thread-11
Thread-12
Thread-13
Thread-14
Thread-15
Thread-16
Thread-17
Thread-18
Thread-19
Thread-20
Thread-21
Thread-22
Thread-23

Process finished with exit code 0

一次同时输出五个,最后一次输出三个。

条件变量同步(Condition)

线程间通信的作用

 1 import threading,time
 2 from random import randint
 3 class Producer(threading.Thread):
 4     def run(self):
 5         global L#一屉
 6         while True:
 7             val=randint(0,100)
 8             print('生产者',self.name,":Append"+str(val),L)
 9 
10             if lock_con.acquire():#锁 ,与lock_con.acquire()一样
11                 L.append(val)#做包子,从后面加
12                 lock_con.notify()#通知wait,激活wait
13                 lock_con.release()
14             time.sleep(3)
15 class Consumer(threading.Thread):
16     def run(self):
17         global L
18         while True:
19                 lock_con.acquire()
20                 if len(L)==0:#没包子
21                     lock_con.wait()#wait阻塞
22 
23                 print('消费者',self.name,":Delete"+str(L[0]),L)
24                 del L[0]#从前面吃
25                 lock_con.release()
26                 time.sleep(0.1)
27 
28 if __name__=="__main__":
29 
30     L=[]
31     lock_con=threading.Condition()#条件变量的锁
32     threads=[]
33     for i in range(5):#启动五个人在做包子,5个线程
34         threads.append(Producer())
35     threads.append(Consumer())#
36     for t in threads:
37         t.start()
38     for t in threads:
39         t.join()

当一屉中有包子的时候,notify激活waiting,添加包子,和吃包子时有线程锁。

同步条件(Event)

 1 import threading,time
 2 
 3 class Boss(threading.Thread):
 4     def run(self):
 5         print("BOSS:今晚大家都要加班到22:00。")
 6         event.isSet() or event.set()#set()设为true
 7         time.sleep(5)
 8         print("BOSS:<22:00>可以下班了。")
 9         event.isSet() or event.set()
10 
11 class Worker(threading.Thread):
12     def run(self):
13         event.wait()#等待老板决定,阻塞
14         print("Worker:哎……命苦啊!")
15         #event.clear()  # 标志位 False 等老板说可以下班, 设为true
16         time.sleep(1)
17         event.clear()#标志位 False 等老板说可以下班, 设为true
18         event.wait()#等老板说别的 ,设为true后
19         print("Worker:OhYeah!") #print Oh,Yeah
20 
21 if __name__=="__main__":
22     event=threading.Event()
23     threads=[]
24     for i in range(5):#五个worker
25         threads.append(Worker())
26     threads.append(Boss())#一个老板
27     for t in threads:
28         t.start()
29     for t in threads:
30         t.join()

boss说完后,5个worker马上能有反应。boss输出后,even.set(),标志位变为True,worker中的event.wait()才能停止阻塞。之后还需将标志位设为False,即event.clear()。

再次等待boss说完话后even.set()将标志位变为True,worker再次发言。

执行结果:

BOSS:今晚大家都要加班到22:00。
Worker:哎……命苦啊!
Worker:哎……命苦啊!
Worker:哎……命苦啊!
Worker:哎……命苦啊!
Worker:哎……命苦啊!
BOSS:<22:00>可以下班了。
Worker:OhYeah!
Worker:OhYeah!
Worker:OhYeah!
Worker:OhYeah!
Worker:OhYeah!

Process finished with exit code 0
 
原文地址:https://www.cnblogs.com/112358nizhipeng/p/9896292.html