线程和进程

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 """
  4 import time
  5 import threading
  6 
  7 def f0(a1,a2):
  8     print("f1")
  9     a = a1 +a2
 10     print(a)
 11 
 12 
 13 def f1(a1,a2):
 14     time.sleep(5)
 15     f0(a1,a2)
 16 
 17 t = threading.Thread(target= f1,args= (123,456))
 18 t.setDaemon(True)
 19 t.start()
 20 t = threading.Thread(target= f1,args= (123,456))
 21 t.start()
 22 t = threading.Thread(target= f1,args= (123,456))
 23 t.start()
 24 
 25 
 26 """
 27 
 28 # python线程
 29 """
 30 import threading
 31 import time
 32 
 33 
 34 def show(arg):
 35     time.sleep(1)
 36     print('thread' + str(arg))
 37 
 38 
 39 for i in range(10):
 40     t = threading.Thread(target=show, args=(i,))
 41     t.start()
 42 
 43 print('main thread stop')
 44 
 45 """
 46 """
 47 import threading
 48 import time
 49 
 50 
 51 class MyThread(threading.Thread):
 52     def __init__(self, num):
 53         threading.Thread.__init__(self)
 54         self.num = num
 55 
 56     def run(self):  # 定义每个线程要运行的函数
 57 
 58         print("running on number:%s" % self.num)
 59 
 60         time.sleep(3)
 61 
 62 
 63 if __name__ == '__main__':
 64     t1 = MyThread(1)
 65     t2 = MyThread(2)
 66     t1.start()
 67     t2.start()
 68 
 69 """
 70 # 线程的阻塞和释放
 71 """
 72 import threading
 73 import time
 74 def do(event):
 75     print("start")
 76     event.wait()
 77     print("execute")
 78 
 79 event_obj = threading.Event()
 80 for i in range(10):
 81     t = threading.Thread(target= do,args= (event_obj,))
 82     t.start()
 83 
 84 event_obj.clear()
 85 inp = input("input")
 86 if inp == "true":
 87     event_obj.set()
 88 
 89 """
 90 
 91 # 创建进程
 92 """
 93 import multiprocessing
 94 import time
 95 
 96 def f1(a1):
 97     time.sleep(2)
 98     print(a1)
 99 if __name__ == "__main__":
100 
101     t = multiprocessing.Process(target= f1,args= (11,))
102     t.start()
103 
104     t = multiprocessing.Process(target= f1,args= (11,))
105     t.start()
106     print('end')
107 
108 # 2、daemon
109 # 若果 False
110 # True
111 
112 """
113 
114 # from multiprocessing import Process
115 # from multiprocessing import Manager
116 #
117 # import time
118 #
119 # li = []
120 #
121 #
122 # def foo(i):
123 #     li.append(i)
124 #     print('say hi', li)
125 #
126 # if __name__ == '__main__':
127 #     for i in range(10):
128 #         p = Process(target=foo, args=(i,))
129 #         p.start()
130 #
131 #     print('ending', li)
132 
133 
134 """
135 from  threading import Thread
136 import time
137 li = []
138 def f1(i):
139     li.append(i)
140     print("你好",li)
141 for i in range(10):
142     t = Thread(target= f1,args=(i,))
143     t.start()
144 print("ending",li)
145 """
146 
147 # 实现线程和进程的互通
148 """
149 from multiprocessing import Process,Array
150 
151 
152 temp = Array('i',[11,22,33,44])#Array是一个数组
153 
154 def Foo(i):
155     temp[i] = 100+i
156     for item in temp:
157         print(i,"---->",item)
158 
159 if __name__ == "__main__":
160     for i in range(2):
161         p = Process(target = Foo,args =(i,))
162         p.start()
163 
164 """
165 
166 # Manager进程和进程之间的参合
167 """
168 import time
169 from multiprocessing import Process,Manager
170 def Foo(i,dic):
171     dic[i] = 100+i
172     print(len(dic))
173 
174 if __name__ == '__main__':
175     manage = Manager()
176     dic =manage.dict()
177     for i in range(2):
178         p = Process(target= Foo,args= (i,dic,))
179         p.start()
180         # p.join()
181     time.sleep(2)
182 
183 """
线程杂谈
1、Threading 用于提供线程相关的操作,线城是应用程序中工作的最小单元,代码如下
 1 import threading
 2 import time
 3 def show(arg):
 4     time.sleep(1)
 5     print("thread " + str(arg))
 6 
 7 for i in range(10):
 8     t = threading.Thread(target= show,args= (i,))
 9     t.start()
10 
11 
12 print("main thread stop")
Threading
2、自定义线程类
 1 import threading
 2 import time
 3 
 4 
 5 class MyThread(threading.Thread):
 6     def __init__(self, num):
 7         threading.Thread.__init__(self)
 8         self.num = num
 9 
10     def run(self):  # 定义每个线程要运行的函数
11 
12         print("running on number:%s" % self.num)
13 
14         time.sleep(3)
15 
16 
17 if __name__ == '__main__':
18     t1 = MyThread(1)
19     t2 = MyThread(2)
20     t1.start()
21     t2.start()
View Code

3、线程锁

 1 # 未使用锁时如下
 2 """
 3 import threading
 4 import time
 5 
 6 gl_num = 0
 7 def show(a):
 8     global gl_num
 9     time.sleep(1)
10     gl_num += 1
11     print(gl_num)
12 
13 if __name__ == "__main__":
14     for i in range(10):
15         t = threading.Thread(target= show,args= (i,))
16         t.start()
17 
18 print("main thread stop")
19 """
20 # 用锁后
21 """
22 import threading
23 import time
24 gl_num = 0
25 
26 lock = threading.RLock()
27 def Func():
28     lock.acquire()
29     global gl_num
30     gl_num +=1
31     time.sleep(0.5)
32     print(gl_num)
33     lock.release()
34 
35 for i in range(10):
36     t = threading.Thread(target=Func,)
37     t.start()
View Code

4、事件(event)

python 线程的时间主要用于主线程控制其他线程的执行,
事件主要提供了三个方法 set,wait,clear

事件处理的机制:全局定义了一个'Flag',如果'Flag'的值为False
,那么当程序执行event。wait方法时就会阻塞,如果Flag的值为True ,那么遇到even.wait
时便不会阻塞
clear:将'Flag'设置为False
set :将'Flag'设置为True
 1 import threading
 2 def do(event):
 3     print("阻塞前")
 4     event.wait()
 5     print("阻塞后")
 6 
 7 event_obj = threading.Event()
 8 for i in range(10):
 9     t = threading.Thread(target= do,args=(i,))
10     t.start()
11 
12 event_obj.clear()
13 inp = input("input")
14 if inp == True:
15     event_obj.set()
View Code

5、进程池

 1 from multiprocessing import Pool
 2 import time
 3 def f1(w):
 4     time.sleep(1)
 5     return 100
 6 
 7 def f2(arg):
 8     print(arg)
 9 
10 if __name__ == "__main__":
11     pool = Pool(5)
12     for i in range(10):
13         pool.apply_async(func= f1,args=(i,),callback= f2)
14         print("主线程运行")
15 
16     pool.close()
17     pool.join()
View Code

6、一个简单的线程池

 1 import queue
 2 import  threading
 3 import time
 4 
 5 class ThreadPool(object):
 6 
 7     def __init__(self,max_num):
 8         self.queue = queue.Queue(max_num)#创建一个队列
 9         for i in range(max_num):
10             self.queue.put(threading.Thread)
11 
12     def get_thread(self):
13         return self.queue.get()
14 
15     def add_thread(self):
16         self.queue.put(threading.Thread)
17 
18 def func(pool,a1):
19     time.sleep(1)
20     print(a1)
21     pool.add_thread()
22     
23 
24 p = ThreadPool(10)
25 for i in range(100):
26     thread = p.get_thread()
27     t = thread(target = func,args = (p,i))#创建了一个线程
28 
29     t.start()
线程池简单

7、复杂的线程池

  1 import queue
  2 import threading
  3 import contextlib
  4 import time
  5 
  6 StopEvent = object()
  7 
  8 class ThreadPool(object):
  9 
 10     def __init__(self, max_num, max_task_num = None):
 11         if max_task_num:
 12             self.q = queue.Queue(max_task_num)
 13         else:
 14             self.q = queue.Queue()
 15         self.max_num = max_num
 16         self.cancel = False
 17         self.terminal = False
 18         self.generate_list = []
 19         self.free_list = []
 20 
 21     def run(self, func, args, callback=None):
 22         """
 23         线程池执行一个任务
 24         :param func: 任务函数
 25         :param args: 任务函数所需参数
 26         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
 27         :return: 如果线程池已经终止,则返回True否则None
 28         """
 29         if self.cancel:
 30             return
 31         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 32             self.generate_thread()
 33         w = (func, args, callback,)
 34         self.q.put(w)
 35 
 36     def generate_thread(self):
 37         """
 38         创建一个线程
 39         """
 40         t = threading.Thread(target=self.call)
 41         t.start()
 42 
 43     def call(self):
 44         """
 45         循环去获取任务函数并执行任务函数
 46         """
 47         current_thread = threading.currentThread()
 48         self.generate_list.append(current_thread)
 49 
 50         event = self.q.get()
 51         while event != StopEvent:
 52 
 53             func, arguments, callback = event
 54             try:
 55                 result = func(*arguments)
 56                 success = True
 57             except Exception as e:
 58                 success = False
 59                 result = None
 60 
 61             if callback is not None:
 62                 try:
 63                     callback(success, result)
 64                 except Exception as e:
 65                     pass
 66 
 67             with self.worker_state(self.free_list, current_thread):
 68                 if self.terminal:
 69                     event = StopEvent
 70                 else:
 71                     event = self.q.get()
 72         else:
 73 
 74             self.generate_list.remove(current_thread)
 75 
 76     def close(self):
 77         """
 78         执行完所有的任务后,所有线程停止
 79         """
 80         self.cancel = True
 81         full_size = len(self.generate_list)
 82         while full_size:
 83             self.q.put(StopEvent)
 84             full_size -= 1
 85 
 86     def terminate(self):
 87         """
 88         无论是否还有任务,终止线程
 89         """
 90         self.terminal = True
 91 
 92         while self.generate_list:
 93             self.q.put(StopEvent)
 94 
 95         self.q.queue.clear()
 96 
 97     @contextlib.contextmanager
 98     def worker_state(self, state_list, worker_thread):
 99         """
100         用于记录线程中正在等待的线程数
101         """
102         state_list.append(worker_thread)
103         try:
104             yield
105         finally:
106             state_list.remove(worker_thread)
107 # How to use
108 pool = ThreadPool(5)
109 
110 def callback(status, result):
111     # status, execute action status
112     # result, execute action return value
113     pass
114 
115 def action(i):
116     print(i)
117 
118 for i in range(30):
119     ret = pool.run(action, (i,), callback)
120 
121 time.sleep(5)
122 print(len(pool.generate_list), len(pool.free_list))
123 print(len(pool.generate_list), len(pool.free_list))
124 # pool.close()
125 # pool.terminate()
线程池

8、协程

线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

greenlet

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3  
 4  
 5 from greenlet import greenlet
 6  
 7  
 8 def test1():
 9     print 12
10     gr2.switch()
11     print 34
12     gr2.switch()
13  
14  
15 def test2():
16     print 56
17     gr1.switch()
18     print 78
19  
20 gr1 = greenlet(test1)
21 gr2 = greenlet(test2)
22 gr1.switch()

gevent

import gevent

 
def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')
 
def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')
 
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
 
 
遇到IO操作自动切换
 
 1 from gevent import monkey; monkey.patch_all()
 2 import gevent
 3 import urllib2
 4 
 5 def f(url):
 6     print('GET: %s' % url)
 7     resp = urllib2.urlopen(url)
 8     data = resp.read()
 9     print('%d bytes received from %s.' % (len(data), url))
10 
11 gevent.joinall([
12         gevent.spawn(f, 'https://www.python.org/'),
13         gevent.spawn(f, 'https://www.yahoo.com/'),
14         gevent.spawn(f, 'https://github.com/'),
15 ])





原文地址:https://www.cnblogs.com/ouyang99-/p/9188109.html