线程

多进程和多线程选择

    一般来说IO密集型用线程(python中线程无法夸cpu执行,i在IO操作只会少许占用cpu,线程安全),计算密集型用多进程(多进程可以跑在不同cpu上)。线程带GIL锁,同时操作IO时候只有一个可以成功操作。

    多进程和线程开多少合适? 理论上多进程个数可以等于cpu核数,线程数量看业务而定,cpu在线程上切换也会占用时间(上下文切换)

thread方法说明

t.start() : 激活线程,

t.getName() : 获取线程的名称

t.setName() : 设置线程的名称 

t.name : 获取或设置线程的名称

t.is_alive() : 判断线程是否为激活状态

t.isAlive() :判断线程是否为激活状态

t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

t.isDaemon() : 判断是否为守护线程

t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。

t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

t.run() :线程被cpu调度后自动执行线程对象的run方法

简单线程操作

 1 #-*- coding:utf-8 -*-
 2 import time
 3 import threading
 4 
 5 
 6 def f0():pass
 7 
 8 
 9 def f1(*args):
10     time.sleep(10)
11     f0()
12 
13 #线程执行前准备
14 t= threading.Thread(target=f1,args=(111,222))
15 #开始执行线程
16 t.start()
17 
18 
19 t= threading.Thread(target=f1,args=(111,222))
20 t.start()
21 t= threading.Thread(target=f1,args=(111,222))
22 t.start()

线程锁

我们使用线程对数据进行操作的时候,如果多个线程同时修改某个数据,可能会出现不可预料的结果,为了保证数据的准确性,引入了锁的概念。

 1 #-*- coding:utf-8  -*-
 2 import threading
 3 import time
 4  
 5 globals_num = 0
 6  
 7 lock = threading.RLock()
 8  
 9 def Func():
10     lock.acquire()  # 获得锁 
11     global globals_num
12     globals_num += 1
13     time.sleep(1)
14     print(globals_num)
15     lock.release()  # 释放锁 
16  
17 for i in range(10):
18     t = threading.Thread(target=Func)
19     t.start()
View Code

threading.RLock和threading.Lock 的区别

RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

线程之间通知(Event)

Event是线程间通信最间的机制之一:一个线程发送一个event信号,其他的线程则等待这个信号。用于主线程控制其他线程的执行。 Events 管理一个flag,这个flag可以使用set()设置成True或者使用clear()重置为False,wait()则用于阻塞,在flag为True之前。flag默认为False。

  • Event.wait([timeout]) : 堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)。
  • Event.set() :将标识位设为Ture
  • Event.clear() : 将标识位设为False。
  • Event.isSet() :判断标识位是否为Ture。
 1 #-*- coding:utf-8  -*-
 2 import threading
 3 
 4 def do(event):
 5     print('start')
 6     #线程在这里阻塞了。等待event_obj.set()方法才能继续执行线程
 7     event.wait()
 8     print('execute')
 9 
10 event_obj = threading.Event()
11 for i in range(10):#加锁
12     t = threading.Thread(target=do,args=(event_obj,))
13     t.start()
14 
15 #event_obj.clear()设置为flase 线程就阻塞了。
16 event_obj.clear()
17 
18 inp = raw_input('input:')#放锁
19 print(inp)
20 if inp == 'true':
21     #发送event事件,线程有阻塞变成无阻塞
22     event_obj.set()
event

当某事过程完毕后改变事件,被阻塞的线程将继续执行

threading.Condition:

一个condition变量总是与某些类型的锁相联系,这个可以使用默认的情况或创建一个,当几个condition变量必须共享和同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪。

condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 acquire() 和 release() 会调用与锁相关联的相应的方法。

其他和锁关联的方法必须被调用,wait()方法会释放锁,当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,

Condition类实现了一个conditon变量。 这个conditiaon变量允许一个或多个线程等待,直到他们被另一个线程通知。 如果lock参数,被给定一个非空的值,,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。

  • wait(timeout=None) : 等待通知,或者等到设定的超时时间。当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError 异常。 wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前 会一直阻塞。wait() 还可以指定一个超时时间。

如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。

注意: notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait() 调用。除非线程调用notify()和notify_all()之后放弃了锁的所有权。

在典型的设计风格里,利用condition变量用锁去通许访问一些共享状态,线程在获取到它想得到的状态前,会反复调用wait()。修改状态的线程在他们状态改变时调用 notify() or notify_all(),用这种方式,线程会尽可能的获取到想要的一个等待者状态。 例子: 生产者-消费者模型

 1 #-*- coding:utf-8 -*-
 2 import threading
 3 import time
 4 
 5 
 6 def consumer(cond):
 7     #不管释放锁还是加锁都需要先用with
 8     with cond:
 9         print("consumer before wait")
10         #等待解锁,如果没写cond.wait()这个方法程序会往下执行
11         cond.wait()
12         print("consumer after wait")
13 
14 
15 #释放锁
16 def producer(cond):
17     with cond:
18         print("producer before notifyAll")
19         #释放锁,这个必须是同一个condition对象
20         cond.notifyAll()
21         print("producer after notifyAll")
22 
23 #定义condition对象
24 condition = threading.Condition()
25 
26 c1 = threading.Thread(name="c1", target=consumer, args=(condition,))
27 c2 = threading.Thread(name="c2", target=consumer, args=(condition,))
28 
29 p = threading.Thread(name="p", target=producer, args=(condition,))
30 
31 c1.start()
32 time.sleep(2)
33 c2.start()
34 time.sleep(2)
35 
36 #释放锁
37 p.start()
condition

自定义线程池(线程池默认没有)

一简单的方法实现线程池

 1 #-*- coding:utf-8 -*-
 2 import  threading
 3 import time
 4 import Queue
 5 
 6 
 7 #线程池
 8 class ThreadPool(object):
 9 
10     def __init__(self,max_num=20):
11         #创建队列默认20个
12         self.queue = Queue.Queue(max_num)
13 
14         #增加线程方法到Queue里面
15         for i in xrange(max_num):
16             self.queue.put(threading.Thread)
17 
18     #获取队列线程
19     def get_thread(self):
20         return self.queue.get()
21 
22     def add_thread(self):
23         self.queue.put(threading.Thread)
24 
25 
26 def func(pool,num):
27     time.sleep(1)
28     print(num)
29     #取一个线程就需要往队列中在放一个线程
30     pool.add_thread()
31 
32 p = ThreadPool(5)
33 
34 for i in range(10):
35     thread=p.get_thread()
36     t= thread(target=func,args=(p,i,))
37     t.start()
View Code

二增强版线程池

  1 #-*- coding:utf-8 -*-
  2 import  threading
  3 import time
  4 import Queue
  5 import contextlib
  6 
  7 StopEvent=object()
  8 
  9 class ThreadPool(object):
 10     def __init__(self,max_num):
 11         #这个Q是无限大的用来接受任务
 12         self.q = Queue.Queue()
 13 
 14         #执行线程计数器
 15         self.work_num =0
 16 
 17         #为True线程会立即终止执行
 18         self.terminal = False
 19 
 20         #最多创建的线程数
 21         self.max_num = max_num
 22 
 23         #真实创建的线程列表
 24         self.generate_list = []
 25 
 26         #空闲线程列表
 27         self.free_list = []
 28 
 29     def run(self,func,args,cllback=None):
 30 
 31         #w元组类型存储的执行函数,参数,callback函数。put到任务队列中去
 32         w = (func,args,cllback)
 33         self.q.put(w)
 34 
 35         #判断是否创建新的线程
 36         #1空线程等于0
 37         #2工作线程小于最大线程数
 38         if len(self.free_list) == 0 and len(self.generate_list)<self.max_num:
 39            #创建线程
 40            self.generate_thread()
 41 
 42     #创建线程
 43     def generate_thread(self):
 44         #启动一个线程然后调用self.call方法
 45         t = threading.Thread(target=self.call)
 46         t.start()
 47 
 48     #当任务执行完毕后往队列中增加停止符
 49     def close(self):
 50         while self.generate_list:
 51             self.q.put(StopEvent)
 52 
 53 
 54     def terminate(self):
 55         self.terminal = True
 56         self.close()
 57         #清空队列.这里已经终止队列中可能还有很多任务没有完成
 58         self.q.empty()
 59 
 60     #做任务
 61     def call(self):
 62         #获取当前线程池
 63         current_thread = threading.currentThread
 64         #加入工作列表
 65         self.generate_list.append(current_thread)
 66 
 67         #获取q中的任务,没有任务get就会阻塞线程
 68         event = self.q.get()
 69 
 70         #增加计数器
 71         self.work_num +=1
 72 
 73         while event != StopEvent:
 74              #获取的event 如果是元组那么它就是任务,其它格式肯定不是任务
 75              #解开任务包
 76              func, args ,callable = event
 77 
 78 
 79              try:
 80                     #获取执行方法的返回值
 81                     result = func(*args)
 82                     #状态
 83                     status = True
 84              except Exception as e:
 85                     #func函数执行过程中报异常了,这里捕获一下
 86                     result = e
 87                     #状态改为false
 88                     status = False
 89 
 90              #判断一下是否需要回调
 91              if callable is not None:
 92                  try:
 93                      #执行回调函数,并且把结果当参数传递到callable中
 94                      #回调函数中只需要判断下status就知道func函数是否执行成功
 95                      callable(status,result)
 96                  except Exception as e:
 97                      pass
 98              if self.terminal:
 99                  event = StopEvent
100              else:
101                  #标记,free_list空闲列表中
102                  self.free_list.append(current_thread)
103 
104                  #获取任务,如果没有get方法将阻塞线程,等待任务到来
105                  event = self.q.get()
106 
107                  #从空闲队列中移除线程
108                  self.free_list.remove(current_thread)
109 
110         else:#这里event == StopEvent 就执行这里,说明event不是元组
111             self.generate_list.remove(current_thread)
112 
113 
114 def f1(args):
115     import time
116     time.sleep(0.5)
117     print(args)
118 
119 #线程池初始线程设置
120 pool = ThreadPool(10)
121 
122 for i in range(5):
123     pool.run(f1,(i,))
124 
125 #增加停止符
126 #pool.close()
127 
128 #立即停止
129 pool.terminate()
130 
131 #查看用了多少个工作线程
132 time.sleep(3)
133 print('
work:%d'%pool.work_num)
View Code

用with改造下面流程:

self.free_list.append(current_thread)
event = self.q.get()
self.free_list.remove(current_thread)

增加ThreadPool类中的一个方法:

1 #上下文管理函数
2     @contextlib.contextmanager
3     def work_status(self,current_thread):
4         self.free_list.append(current_thread)
5         try:
6               yield
7         finally:
8             self.free_list.remove(current_thread)
View Code

将上面改造的三行代码改成:

1 with self.work_status(current_thread):
2                      event = self.q.get()
View Code
原文地址:https://www.cnblogs.com/menkeyi/p/7089297.html