Python之Threading模块

Thread

先引入一个例子:

>>> from threading import Thread,currentThread,activeCount
>>> 
>>> def test(s):
...     print "ident:",currentThread().ident
...     print "count:",activeCount()
...     print s
... 
>>> 
>>> Thread(target = test, args =('Hello',)).start()
ident: 1099229504
count: 2
Hello

需要模块threading,对应的帮助文档:

http://docs.python.org/2.7/library/threading.html#module-threading

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={})
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If the subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

除了标识符,还可以给线程取个名字,便于调试。

还可以继承Thread实现自己的线程类:

>>> from threading import *
>>> 
>>> class MyThread(Thread):
...     def __init__(self,name,*args):
...             super(MyThread,self).__init__(name = name)#调用父类的init,设置线程的名称
...             self.data = args
...     
...     def run(self):
...             print self.name,self.data
... 
>>> 
>>> MyThread("abc",range(10)).start()
abc ([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],)>>> 

>>> 
>>> MyThread("abc",range(5)).start() 
abc ([0, 1, 2, 3, 4],)
>>> MyThread("abc",range(10)).start()
abc ([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],)

将线程daemon属性设为True,那么表示这是一个背景线程,进程退出时不会等到该线程结束。

调用join()等到线程结束,可提供超时参数(秒,浮点数设定更小粒度)。

isAlive()检查线程状态,join()可以多次调用。

>>> from time import sleep
>>> 
>>> def test():
...     print "__thread__start__"
...     sleep(10)
...     print "__thread__exit__"
... 
>>> 
>>> def run():
...     t = Thread(target = test)
...     t.start()
...     t.join(2) //设置超时时间为2s
...     
...     print t.isAlive()//检查线程状态
...     t.join() //再次等待
...     
...     print "over!"
... 
>>> 
>>> run()
__thread__start__
True
__thread__exit__
over!

Lock

Lock不支持递归加锁,也就是说即便在同一个线程中,也必须等待锁释放。通常建议改用RLock,它会处理"owning thread"和"recursion level"状态,对于同一个线程的多次请求锁行为,只累加计数器。每次调用release()将递减该计数器,直到0时释放锁,因此acquire()和relase()必须承兑出现,一个加锁,一个释放。

threading中的成员大多实现了上下文协议,尽可能用with代替手工调用。

>>> lock = RLock()
>>> 
>>> def show(i):
...     with lock:
...             print currentThread().name,i
...             sleep(0.1)
... 
>>> def test():
...     with lock:
...             for i in range(5):
...                     show(i)
... 
>>> 
>>> for i in range(2):
...     Thread(target = test).start()
... 
>>>
Thread-2 0
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4
Thread-3 0
Thread-3 1
Thread-3 2
Thread-3 3
Thread-3 4

所有线程等待lock锁,串行执行。

Event

Event通过一个内部标记来协调多线程运行。方法wait()阻塞线程执行,直到标记为True。

set()将标记设为True,clear()更改标记为False。isSet()用于判断标记状态。

>>> from threading import *
>>> 
>>> def test():
...     e = Event()
...     def test():
...             for i in range(5):
...                     e.wait()
...                     e.clear()
...                     print i
...     Thread(target = test).start()
...     return e
... 
>>> e = test()
>>> e.set()
>>> 0

>>> e.set()
>>> 1

>>> e.set()
>>> 2

>>> e.set()
>>> 3

如果不调用clear(),那么标记一直为True,wait()就不会发生堵塞行为。

通常为每个线程准备一个独立的Event,而不是多个线程共享,以避免未及时调用clear(0时发生意外情况。

condition

condition像Lock和Event的综合体,除基本的锁操作外,还提供了类似yield的功能。

在获取锁以后,可以调用wait()临时让出锁,当前线程被阻塞,直到notify()发送通知后再次请求锁来恢复执行。将wait当做yield,那么notify就是send

可以将已有的锁对象传给Condition

>>> from threading import *
>>> from time import sleep
>>> 
>>> 
>>> cond = Condition()
>>> 
>>> def t1():
...     with cond:
...             for i in range(5):
...                     print currentThread().name,i
...                     sleep(0.1)
...                     if i == 3:cond.wait()
... 
>>> 
>>> def t2():
...     with cond:
...             for i in range(5):
...                     print currentThread().name,i
...                     sleep(0.1)
...             cond.notify()
... 
>>> 
>>> Thread(target = t1).start();Thread(target = t2).start()
Thread-1 0
>>> Thread-1 1
Thread-1 2
Thread-1 3 //调用wait(),获取锁,当前线程被阻塞
Thread-2 0
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4
Thread-1 4//t2执行完range(5)循环,通过cond.notify()发送通知,重新获取锁,继续执行

只有获取锁的线程才能调用wait()和notify(),因此必须在锁释放前调用。

当wait()释放锁后,其他线程也可进入wait状态。notifyAll()激活所有等待线程,让它们去抢锁然后完成后继续执行。

>>> def test():
...     with cond:
...             for i in range(5):
...                     print currentThread().name,i
...                     sleep(0.1)
...                     if i == 2:cond.wait()
... 
>>> 
>>> Thread(target = test).start();Thread(target = test).start()
Thread-3 0

>>> Thread-3 1
Thread-3 2
Thread-4 0
Thread-4 1
Thread-4 2 //Thread-4等待,Thread-3持有锁

>>> with cond:cond.notifyAll() //通知所有cond.wait线程
... 
>>> Thread-3 3 //Thread-3和Thread-4再次抢锁完成后继续执行,顺序不定
Thread-3 4
Thread-4 3
Thread-4 4

Semaphore

Semaphore通过一个计数器来限制可同时运行的线程数量。计数器表示还可以运行的线程数量。

acquire()递减计数器,release()则是增加计数器。

>>> sem  = Semaphore(2)
>>> 
>>> def test():
...     with sem:
...             for i in range(5):
...                     print currentThread().name,i
...                     sleep(0.1)
... 
>>> 
>>> for i in range(3):
...     Thread(target = test).start()
... 
Thread-5 0//线程5和6同时执行,获取计数器,使其减为0,故使得线程7被阻塞
Thread-6 0
>>> Thread-5 1
Thread-6 1
Thread-5 2
Thread-6 2
Thread-5 3
Thread-6 3
Thread-5 4
Thread-6 4//线程5和线程6执行完成后,释放信号量,线程7开始执行
Thread-7 0
Thread-7 1
Thread-7 2
Thread-7 3
Thread-7 4

线程5和6同时执行,获取计数器,使其减为0,故使得线程7被阻塞,故前面输出只有线程5和线程6。

在线程5和线程6执行完成后,释放信号量,线程7开始执行。

Timer

用一个独立线程在n秒后执行某个函数。如定时器尚未执行,可用cancel()取消,定时器仅执行一次。

>>> import datetime
>>> from threading import *
>>> 
>>> def test():
...     print datetime.datetime.now()
... 
>>> Timer(2,test).start()
>>> 2013-10-29 21:28:07.990131

mark:import datetime和from datetime import *

Local

TLS(thread-lock storage)为线程提供独立的存储空间。

>>> from threading import *
>>> 
>>> from time import sleep
>>> 
>>> data = local()
>>> 
>>> def test(fn,x):
...     data.x = x
...     
...     for i in range(5):
...             data.x = fn(data.x)
...             print currentThread().name,data.x
...             sleep(0.1)
...     
... 
>>> 
>>> t1 = (lambda x:x+1,0)
>>> t2 = (lambda x:x+'a','a')
>>> 
>>> for d in (t1,t2):
...     Thread(target = test,args = d).start()
... 
Thread-1 1
 Thread-2 aa

>>> Thread-1 2
Thread-2 aaa
Thread-1 3
Thread-2 aaaa
Thread-1 4
Thread-2 aaaaa
Thread-1 5
Thread-2 aaaaaa
原文地址:https://www.cnblogs.com/gsblog/p/3393242.html