python之路之线程,进程,协程2

一、线程

  1、创建线程

  2、主线程是否等待子线程

     t.setDaemon(Ture/False):默认是false,等待子线程完成,ture,表示不等待子线程结束

   3、主线程等待,子线程执行

    join(),一直等到子线程结束

    join(3),最多等待3秒,如果子线程需要两秒,则等待2秒。

  4、线程锁

    R.rlock()

 1 #!/usr/bin/env python
 2 #coding:utf-8
 3    
 4 import threading
 5 import time
 6    
 7 gl_num = 0
 8    
 9 lock = threading.RLock()
10    
11 def Func():
12     lock.acquire()
13     global gl_num
14     gl_num +=1
15     time.sleep(1)
16     print gl_num
17     lock.release()
18        
19 for i in range(10):
20     t = threading.Thread(target=Func)
21     t.start()
线程锁

  5、线程事件

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3  
 4 import threading
 5  
 6  
 7 def do(event):
 8     print 'start'
 9     event.wait()
10     print 'execute'
11  
12  
13 event_obj = threading.Event()
14 for i in range(10):
15     t = threading.Thread(target=do, args=(event_obj,))
16     t.start()
17  
18 event_obj.clear()
19 inp = raw_input('input:')
20 if inp == 'true':
21     event_obj.set()
线程事件

  6、线程池

    python内部没有提供

    需要自定义

 二、生产者消费者模型及队列

三、进程

  1、创建进程

  2、daemon

      默认false,歘歘

  3、jion()等待

    

  4、进程之间数据不能共享

 1 #!/usr/bin/env python
 2 #coding:utf-8
 3  
 4 from multiprocessing import Process
 5 from multiprocessing import Manager
 6  
 7 import time
 8  
 9 li = []
10  
11 def foo(i):
12     li.append(i)
13     print 'say hi',li
14   
15 for i in range(10):
16     p = Process(target=foo,args=(i,))
17     p.start()
18      
19 print 'ending',li
进程之间数据不共享

          ······线程之间数据是共享的·············

      ·············进程数据不能共享(默认)············

~~~~~~~~~~~~~~~~~~~~~~~~~进程之间数据共享~~~~~~~~~~~~~~~~~~~~

 1 #方法一,Array
 2 from multiprocessing import Process,Array
 3 temp = Array('i', [11,22,33,44])
 4  
 5 def Foo(i):
 6     temp[i] = 100+i
 7     for item in temp:
 8         print i,'----->',item
 9  
10 for i in range(2):
11     p = Process(target=Foo,args=(i,))
12     p.start()
13  
14 #方法二:manage.dict()共享数据
15 from multiprocessing import Process,Manager
16  
17 manage = Manager()
18 dic = manage.dict()
19  
20 def Foo(i):
21     dic[i] = 100+i
22     print dic.values()
23  
24 for i in range(2):
25     p = Process(target=Foo,args=(i,))
26     p.start()
27     p.join()
进程数据共享py2.7(两个方法)

  5、进程池

    p  = Pool(5)

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from  multiprocessing import Process,Pool
 4 import time
 5   
 6 def Foo(i):
 7     time.sleep(2)
 8     return i+100
 9   
10 def Bar(arg):
11     print arg
12   
13 pool = Pool(5)
14 #print pool.apply(Foo,(1,))
15 #print pool.apply_async(func =Foo, args=(1,)).get()
16   
17 for i in range(10):
18     pool.apply_async(func=Foo, args=(i,),callback=Bar)
19   
20 print 'end'
21 pool.close()
22 pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
进程池

 

~~~~~~~~~~~~进程池基础之apply和apply_async方法区别~~~~~~~~~~~~~~~~

    p.apply()  每一个任务是排队进行,进程.join()

    p.apply_async()  每一个任务并发进行,可以设置回调函数,进程无.join(),daemon=True

四、线程池的实现

  1、低配版线程池

 

   2、高配版线程池

       (1)、设计思路

  1 复制代码
  2 
  3 #!/usr/bin/env python
  4 # -*- coding:utf-8 -*-
  5 
  6 import queue
  7 import threading
  8 import contextlib
  9 import time
 10 
 11 StopEvent = object()
 12 
 13 
 14 class ThreadPool(object):
 15 
 16     def __init__(self, max_num, max_task_num = None):
 17         if max_task_num:
 18             self.q = queue.Queue(max_task_num)
 19         else:
 20             self.q = queue.Queue()
 21         self.max_num = max_num
 22         self.cancel = False
 23         self.terminal = False
 24         self.generate_list = []
 25         self.free_list = []
 26 
 27     def run(self, func, args, callback=None):
 28         """
 29         线程池执行一个任务
 30         :param func: 任务函数
 31         :param args: 任务函数所需参数
 32         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
 33         :return: 如果线程池已经终止,则返回True否则None
 34         """
 35         if self.cancel:
 36             return
 37         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 38             self.generate_thread()
 39         w = (func, args, callback,)
 40         self.q.put(w)
 41 
 42     def generate_thread(self):
 43         """
 44         创建一个线程
 45         """
 46         t = threading.Thread(target=self.call)
 47         t.start()
 48 
 49     def call(self):
 50         """
 51         循环去获取任务函数并执行任务函数
 52         """
 53         current_thread = threading.currentThread()
 54         self.generate_list.append(current_thread)
 55 
 56         event = self.q.get()
 57         while event != StopEvent:
 58 
 59             func, arguments, callback = event
 60             try:
 61                 result = func(*arguments)
 62                 success = True
 63             except Exception as e:
 64                 success = False
 65                 result = None
 66 
 67             if callback is not None:
 68                 try:
 69                     callback(success, result)
 70                 except Exception as e:
 71                     pass
 72 
 73             with self.worker_state(self.free_list, current_thread):
 74                 if self.terminal:
 75                     event = StopEvent
 76                 else:
 77                     event = self.q.get()
 78         else:
 79 
 80             self.generate_list.remove(current_thread)
 81 
 82     def close(self):
 83         """
 84         执行完所有的任务后,所有线程停止
 85         """
 86         self.cancel = True
 87         full_size = len(self.generate_list)
 88         while full_size:
 89             self.q.put(StopEvent)
 90             full_size -= 1
 91 
 92     def terminate(self):
 93         """
 94         无论是否还有任务,终止线程
 95         """
 96         self.terminal = True
 97 
 98         while self.generate_list:
 99             self.q.put(StopEvent)
100 
101         self.q.queue.clear()
102 
103     @contextlib.contextmanager
104     def worker_state(self, state_list, worker_thread):
105         """
106         用于记录线程中正在等待的线程数
107         """
108         state_list.append(worker_thread)
109         try:
110             yield
111         finally:
112             state_list.remove(worker_thread)
113 
114 
115 
116 # How to use
117 
118 
119 pool = ThreadPool(5)
120 
121 def callback(status, result):
122     # status, execute action status
123     # result, execute action return value
124     pass
125 
126 
127 def action(i):
128     print(i)
129 
130 for i in range(30):
131     ret = pool.run(action, (i,), callback)
132 
133 time.sleep(5)
134 print(len(pool.generate_list), len(pool.free_list))
135 print(len(pool.generate_list), len(pool.free_list))
136 # pool.close()
137 # pool.terminate()
View Code

    2、上下文管理基础

 

    3、上下文管理之with自定义open

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 
  4 import queue
  5 import threading
  6 import contextlib
  7 import time
  8 
  9 StopEvent = object()
 10 
 11 
 12 class ThreadPool(object):
 13 
 14     def __init__(self, max_num, max_task_num = None):
 15         if max_task_num:
 16             self.q = queue.Queue(max_task_num)
 17         else:
 18             self.q = queue.Queue()
 19             # 最多创建的线程数(线程池最大容量)
 20         self.max_num = max_num
 21         self.cancel = False
 22         self.terminal = False
 23         # 真实创建的线程列表
 24         self.generate_list = []
 25         # 空闲线程数量
 26         self.free_list = []
 27 
 28     def run(self, func, args, callback=None):
 29         """
 30         线程池执行一个任务
 31         :param func: 任务函数
 32         :param args: 任务函数所需参数
 33         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
 34         :return: 如果线程池已经终止,则返回True否则None
 35         """
 36         w = (func, args, callback,)
 37         self.q.put(w)
 38         # 把任务放在一个元组里
 39 
 40         if self.cancel:
 41             return
 42         # 如果没有空闲线程,且创建的线程数目小于线程池最大创建数目
 43         # 则创建线程
 44         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 45             self.generate_thread()
 46 
 47     def generate_thread(self):
 48         """
 49         创建一个线程
 50         """
 51         t = threading.Thread(target=self.call)
 52         t.start()
 53 
 54     def call(self):
 55         """
 56         循环去获取任务函数并执行任务函数
 57         """
 58         # 获取当前线程
 59         current_thread = threading.currentThread()
 60         self.generate_list.append(current_thread)
 61         # 去任务并执行
 62         event = self.q.get()
 63         while event != StopEvent:
 64             # 是任务
 65             # 解开任务包
 66             func, arguments, callback = event
 67             # 执行任务
 68             try:
 69                 result = func(*arguments)
 70                 success = True
 71             except Exception as e:
 72                 success = False
 73                 result = e
 74 
 75             if callback is not None:
 76                 try:
 77                     callback(success, result)
 78                 except Exception as e:
 79                     pass
 80             # 标记 空闲了
 81             with self.worker_state(self.free_list, current_thread):
 82                 if self.terminal:
 83                     event = StopEvent
 84                 else:
 85                     # 取任务
 86                     event = self.q.get()
 87         else:
 88             # 不是元组,不是任务
 89             self.generate_list.remove(current_thread)
 90 # 想要终止
 91     # 1、让正在从队列中取任务的线程挂掉
 92     # 2、主线程,你跳我就跳
 93 
 94     def close(self):
 95         """
 96         执行完所有的任务后,所有线程停止
 97         """
 98         self.cancel = True
 99         full_size = len(self.generate_list)
100         while full_size:
101             self.q.put(StopEvent)  # 给队列加终止符,有几个加几个
102             full_size -= 1
103 
104     def terminate(self):
105         """
106         无论是否还有任务,终止线程
107         """
108         self.terminal = True
109 
110         while self.generate_list:
111             self.q.put(StopEvent)
112 
113         self.q.queue.clear()
114 
115     @contextlib.contextmanager # 装饰器 处理上下文
116     def worker_state(self, state_list, worker_thread):
117         """
118         用于记录线程中正在等待的线程数
119         """
120         state_list.append(worker_thread)
121         try:
122             yield
123         finally:
124             state_list.remove(worker_thread)
125 
126 
127 pool = ThreadPool(5)
128 
129 def callback(status, result):
130     # status, execute action status
131     # result, execute action return value
132     pass
133 
134 
135 def action(i):
136     print(i)
137 
138 
139 for i in range(30):
140     # 将任务放在队列中
141     # 着手开始处理任务
142     #           -创建线程
143                 #   -有空闲线程,则不再创建线程
144                 #     -不能高于线程池的限制
145                 #     -根据任务个数判断
146 
147     #           -线程去队列中取任务
148     ret = pool.run(action, (i,), callback)
149 
150 time.sleep(5)
151 print(len(pool.generate_list), len(pool.free_list))
152 print(len(pool.generate_list), len(pool.free_list))
153 pool.close()
154 pool.terminate()
最终代码

五、 协程

  greenlet

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3  
 4  
 5 from greenlet import greenlet
 6  
 7  
 8 def test1():
 9     print 12
10     gr2.switch()
11     print 34
12     gr2.switch()
13  
14  
15 def test2():
16     print 56
17     gr1.switch()
18     print 78
19  
20 gr1 = greenlet(test1)
21 gr2 = greenlet(test2)
22 gr1.switch()

 1 import gevent
 2  
 3 def foo():
 4     print('Running in foo')
 5     gevent.sleep(0)
 6     print('Explicit context switch to foo again')
 7  
 8 def bar():
 9     print('Explicit context to bar')
10     gevent.sleep(0)
11     print('Implicit context switch back to bar')
12  
13 gevent.joinall([
14     gevent.spawn(foo),
15     gevent.spawn(bar),
16 ])

 遇到io操作自动切换:

 1 from gevent import monkey; monkey.patch_all()
 2 import gevent
 3 import urllib2
 4 
 5 def f(url):
 6     print('GET: %s' % url)
 7     resp = urllib2.urlopen(url)
 8     data = resp.read()
 9     print('%d bytes received from %s.' % (len(data), url))
10 
11 gevent.joinall([
12         gevent.spawn(f, 'https://www.python.org/'),
13         gevent.spawn(f, 'https://www.yahoo.com/'),
14         gevent.spawn(f, 'https://github.com/'),
15 ])
View Code

原文地址:https://www.cnblogs.com/minmin123/p/8732452.html