20、Python之进程,协程,I/O

一、进程

    进程的使用与线程基本差不多,但由于进程之间的资源是无法共享的,从而引发出进程同步,进程通信等一系列的概率,首先我们来看一下,python中创建进程的2中方法。

1、直接调用

1 import multiprocessing
2 
3 def run(n):
4     print("process %s is runing" % n)
5 if __name__ == '__main__':
6     p = multiprocessing.Process(target=run,args=(1,))
7     p.start()
View Code

2、集成式调用

 1 import multiprocessing
 2 
 3 class MyProcess(multiprocessing.Process):
 4     def __init__(self,process_name):
 5         self.__process_name = process_name
 6         super(MyProcess,self).__init__()
 7     def run(self):
 8         print("%s is running" % self.__process_name)
 9 if __name__ == '__main__':
10     p = MyProcess("myprocess")
11     p.start()
View Code

关于进程的其他用法与线程基本一样,这里不再啰嗦。

    进程间的通信

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

1、进程Queue用法与线程的使用一样,但该为进程的Queue,与线程的Queue不同

 1 #Author gwx
 2 import multiprocessing
 3 from multiprocessing import Queue
 4 # from queue import Queue #这个导入的线程的队列
 5 class MyProcess(multiprocessing.Process):
 6     def __init__(self,process_name,q):
 7         self.__process_name = process_name
 8         self.__q = q
 9         super(MyProcess,self).__init__()
10     def run(self):
11         self.__q.put(self.__process_name)
12 
13 if __name__ == '__main__':
14     q = Queue()
15     q.put("mainprocess")
16     p1 = MyProcess("myprocess",q)
17     p2 = MyProcess("yourprocess",q)
18     p1.start()
19     p2.start()
20     p1.join()
21     p2.join()
22     while q.qsize() > 0:
23         print(q.get())
View Code

线程的queue是可以随时访问的,但进程的不可以的,所以进程的Queue是专门解决这个问题的,其本质上如图所示。

 进程1和进程2进行操作queue时,都会取第三方内存中queue的数据,操作完之后再将数据的备份放回第三方内存中,本质上是以第三方的队列做一个桥梁,咱们后面学习到的rabbitMQ也是这个原理。

2、管道(Pipe)。Pipe也可以用于进程之间的通信,需要通信的2个进程分别位于管道的2端,通过send和recv方法进行发收数据。

 1 from multiprocessing import Process,Pipe
 2 
 3 def run(conn):
 4     conn.send("你好,我是子进程")
 5     print(conn.recv())
 6 if __name__ == '__main__':
 7     parent_conn,child_conn = Pipe()
 8     parent_conn.send("你好我是父进程")
 9     p = Process(target=run,args=(child_conn,))
10     p.start()
11     p.join()
12     print(parent_conn.recv())
View Code

3、Managers。该类也能用于进程之间的通信,用法如下:

 1 from multiprocessing import Process,Manager
 2 import os
 3 def run(d,l):
 4     d[os.getpid()] = os.getpid()
 5     l.append(os.getpid())
 6 if __name__ == '__main__':
 7     with Manager() as manager:
 8         manager = Manager()
 9         d = manager.dict()
10         l = manager.list()
11 
12         p_list = []
13         for i in range(10):
14             p = Process(target=run,args=(d,l))
15             p.start()
16             p_list.append(p)
17         for pp in p_list:
18             pp.join()
19         print(d)
20         print(l)
View Code

    进程同步

我们说进程之间的内存空间是不可以相互访问的,所以对于内存空间而言,进程之前是不存在加锁这一说法的,但是对于I/O而言,它对于所有进程来说是共享资源,需要加锁,当然这里的锁引用的也是进程的锁而不是线程的锁,案例如下:

 1 from multiprocessing import Process,Lock
 2 
 3 def run(l):
 4     l.acquire()
 5     print("hello multiprocessing")
 6     l.release()
 7 if __name__ == '__main__':
 8     l = Lock()
 9     for i in range(10):
10         p = Process(target=run,args=(l,))
11         p.start()
View Code

    进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply  
  • apply_async
 1 from multiprocessing import Process,Pool
 2 import time
 3 def run(i):
 4     print("hello multiprocessing",i)
 5     time.sleep(1)
 6 if __name__ == '__main__':
 7     pool = Pool(5)
 8     for i in range(10):
 9         # pool.apply(run,args=(i,))
10         pool.apply_async(run,args=(i,))
11     pool.close()
12     pool.join()  #这块不知道为啥
View Code

二、协程

     协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

协程的好处:

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
    •   "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

一个标准的协程应该符合以下几点:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 一个协程遇到IO操作自动切换到其它协程

    Greenlet:greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,使用方法如下:

 1 from greenlet import greenlet
 2 
 3 def test1():
 4     print("11111111")
 5     gr2.switch()
 6     print("33333333")
 7     gr2.switch()
 8 def test2():
 9     print("22222222")
10     gr1.switch()
11     print("44444444")
12 
13 gr1 = greenlet(test1)
14 gr2 = greenlet(test2)
15 gr1.switch()
View Code

看上面的例子发现,greenlet不能实现自动切换,每次都需要switch来进行切换。

    Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。使用方法如下:

 1 import gevent
 2 
 3 def test1():
 4     print("11111111")
 5     gevent.sleep(2)
 6     print("33333333")
 7 def test2():
 8     print("22222222")
 9     gevent.sleep(1)
10     print("44444444")
11 
12 gevent.joinall([gevent.spawn(test1),gevent.spawn(test2)])
View Code

      joinall就是等待多个协程执行完毕。

     使用协程对性能的提升情况,看下面这段代码。

 1 import gevent
 2 
 3 def task(pid):
 4     gevent.sleep(0.5)
 5     print('Task %s done' % pid)
 6 
 7 def synchronous():
 8     for i in range(1, 10):
 9         task(i)
10 
11 
12 def asynchronous():
13     threads = [gevent.spawn(task, i) for i in range(10)]
14     gevent.joinall(threads)
15 
16 
17 print('Synchronous:')
18 synchronous()
19 
20 print('Asynchronous:')
21 asynchronous()
View Code

    一个简单的爬虫小程序

 1 from urllib import request
 2 import gevent
 3 from gevent import monkey
 4 
 5 monkey.patch_all() #把当前所有的程序的io操作给我单独的做上记号
 6 def f(url):
 7     print("Get:%s" % url)
 8     resp = request.urlopen(url)
 9     with open("url.html","wb") as file:
10         for line in resp.readlines():
11             file.write(line)
12 
13 gevent.joinall([
14     gevent.spawn(f,"http://www.cnblogs.com/win0211/category/1154652.html"),
15     gevent.spawn(f,"http://www.cnblogs.com/win0211/p/8549921.html")
16               ])
View Code

    使用多协程实现socket并发。

服务端代码:

 1 import socket
 2 from gevent import monkey
 3 import gevent
 4 
 5 monkey.patch_all()#告诉编译器采用多协程的方式处理
 6 def sever_handle(conn):
 7     while True:
 8         data = conn.recv(1024)
 9         print(data)
10         conn.send(data)
11     conn.close()
12 
13 server = socket.socket()
14 server.bind(("localhost",100))
15 while True:
16     server.listen()
17     conn,addre = server.accept()
18     gevent_handle = gevent.spawn(sever_handle,conn)#来一个连接,开启一个协程
19     # gevent.joinall() 因为这里是while True所以不需要joinall
View Code

客户端代码:

 1 import socket,threading,time
 2 def client_01(i):
 3     time.sleep(2)
 4     client = socket.socket()
 5     client.connect(("localhost",100))
 6     while True:
 7         data = input(">>")
 8         client.send(data.encode("utf-8"))
 9         data = client.recv(1023)
10         print(data)
11     client.close()
12 
13 client_01(1)
View Code

三、I/O

    对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正式因为这两个阶段,linux系统产生了下面五种网络模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路复用( IO multiplexing)
- 信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)

注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

阻塞 I/O(blocking IO)

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

 

非阻塞 I/O(nonblocking IO)

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

I/O 多路复用( IO multiplexing)

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

异步 I/O(asynchronous IO)

inux下的asynchronous IO其实用得很少。先看一下它的流程:

使用select实现I/O多路复用(socket服务端)。

 1 import select,socket,queue
 2 
 3 server = socket.socket()
 4 
 5 server.bind(("localhost",999))
 6 
 7 server.listen(10000)
 8 
 9 server.setblocking(False) #设置为不阻塞
10 inputs = []
11 outputs = []
12 msg_dict = {}
13 inputs.append(server)#将service加入
14 
15 while True:
16     print("...........................")
17     readable,writeable,exceptional = select.select(inputs,outputs,inputs)#如果没有数据 则阻塞
18     for r in readable:
19         if r is server:
20             print("来了一个新连接")
21             conn,address = r.accept()
22             conn.setblocking(False)
23             inputs.append(conn)
24             msg_dict[conn] = queue.Queue()
25         else:
26             print("准备收数据")
27             data = r.recv(1024)
28             print(data)
29             msg_dict[r].put(data)
30             outputs.append(r)
31     for w in writeable:   #
32         print("准备发数据")
33         w.send(msg_dict[w].get())
34         outputs.remove(w)
35     for e in exceptional:
36         print("出错")
37         if e in outputs:
38             outputs.remove(e)
39         inputs.remove(e)
40         del msg_dict[e]
41 
42 server.close()
View Code

使用selectors实现I/O多路复用(socket服务端),selectors是对select的封装,功能更强大。实例代码如下:

 1 import  selectors #封装了select的操作
 2 import socket
 3 
 4 sel = selectors.DefaultSelector()#进行默认配置
 5 
 6 server = socket.socket()
 7 server.bind(("localhost",999))
 8 server.setblocking(False)
 9 server.listen(10000)
10 def read(conn,mask):
11     print("开始接收数据")
12     data = conn.recv(1024)
13     print(data)
14     conn.send(data)
15     conn.setblocking(False)
16     if not data:#客户端断开 将连接从sel删除
17         print("断开连接")
18         conn.close()
19         sel.unregister(conn)#
20 
21 def accept(server,mask):
22     print("新开一个连接")
23     conn,addr = server.accept()
24     sel.register(conn,selectors.EVENT_READ,read)
25 
26 sel.register(server,selectors.EVENT_READ,accept)#注册服务,关联文件句柄和回调函数
27 print("server",server)
28 print(accept,read)
29 while True:
30     try:
31         events = sel.select()#阻塞 是否有活动的事件
32         print(events)
33         for key, mask in events:  #
34             # callback = key.data  # key.data为accept或者read的地址 key.fileobj为连接地址
35             # callback(key.fileobj, mask)
36             key.data(key.fileobj,mask)#上面2行代码等同于这一行
37     except OSError as e:
38         print(e)
View Code
原文地址:https://www.cnblogs.com/win0211/p/8592316.html