python自动华 (十)

Python自动化 【第十篇】:Python进阶-多进程/协程/事件驱动与SelectPollEpoll异步IO

本节内容:

  1. 多进程
  2. 协程
  3. 事件驱动与SelectPollEpoll异步IO

 

1.  多进程

  启动多个进程

  进程中启进程

  父进程与子进程

  进程间通信

  不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

a)   queues

  
#!/usr/bin/env python

# -*- coding:utf-8 -*-

from multiprocessing import Process, Queue

import queue

import threading

def f(qq):

    qq.put("hahaha123")

if __name__ == '__main__':

    #q = queue.Queue() #  线程queue不能直接传给子进程

    q = Queue()

    p = Process(target=f, args=(q,))

    #p = threading.Thread(target=f, args=(q,))

    p.start()

    print(q.get())

    p.join()
queues

  父进程克隆了一个Queue,将克隆的Queue交给了子进程,当一个Queue对数据进行修改时,会将修改后的Queue数据序列化到某一位置,另一个Queue会从这个位置反序列化获取数据,实现进程间的通信

b)   Pipes

  
#!/usr/bin/env python

# -*- coding:utf-8 -*- 

from multiprocessing import Process, Pipe

def f(conn):

    conn.send("qqqqqq")

    conn.send("qqqqqq2")

    print("from parent:", conn.recv())

    conn.close()

if __name__ == '__main__':

    parent_conn, chile_conn = Pipe()

    p = Process(target=f, args=(chile_conn,))

    p.start()

    print(parent_conn.recv())

    print(parent_conn.recv())

    parent_conn.send("hehehhe")

    p.join()
pipes

c)       Managers 实现进程间数据的共享,可以同时修改,而不是数据的传递

  
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: zhoujunlong
from multiprocessing import Process, Manager
import os
def f(d, l):
    d[os.getpid()] = os.getpid()

    l.append(os.getpid())
    print(l)
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict() # 生成一个字典可在多个进程间共享和传递
        l = manager.list()# 生成一个列表可在多个进程间共享和传递
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l,))
            p.start()
            p_list.append(p)
        for res in p_list:# 等待结果
            res.join()
        print(d)
        print(l)
managers  

  进程同步

  
#!/usr/bin/env python

# -*- coding:utf-8 -*-

from multiprocessing import Process,Lock

def f(l, i):

    l.acquire()

    print("hello world", i)

    l.release()

if __name__ == '__main__':

    lock = Lock()

    for num in range(10):

        Process(target=f, args=(lock, num)).start()
进程同步 

  进程池(生产中常用)

  
#!/usr/bin/env python

# -*- coding:utf-8 -*-

from multiprocessing import Process, Pool, freeze_support

import time, os

def Foo(i):

    time.sleep(2)

    print("in process ", os.getpid())

    return i

def Bar(args):

    print("--->", args, os.getpid())

if __name__ == '__main__':

    #freeze_support()

    pool = Pool(processes=5)

    print("main_process:", os.getpid())

    for i in range(10):

        #pool.apply(func=Foo, args=(i,))

        #pool.apply_async(func=Foo, args=(i,))

        pool.apply_async(func=Foo, args=(i,),callback=Bar)

    print('end')

    pool.close()

    pool.join() #进程池中进程执行完毕后再关闭,注释后程序不等进程执行我那后就直接关闭了
进程池

 

2.  协程

  协程,微线程

  协程的好处:

  • 无需线程上下文切换的开销
  • 无需原子操作的锁定及同步的开销
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展+低成本(一个cpu可支持上万个协程)

     缺点:

  • 无法利用多核资源,需要和进程配合才能运行在多CPU上
  • 运行阻塞(blocking)操作会阻塞整个程序

  通过yield实现简单的协程(单线程实现多并发效果):

  
#!/usr/bin/env python
# -*- coding:utf-8 -*-
def consumer(name):
    print("----->starting")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name, new_baozi))
def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n += 1
        print("33[32;1m[producer]33[0m is making baozi %s" % n)
        con.send(n) #
        con2.send(n)
if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()
yield

  greenlet实现协程手动切换

  
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: zhoujunlong
from greenlet import greenlet
def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()
def test2():
    print(56)
    gr1.switch()
    print(78)
    gr1.switch()
gr1 = greenlet(test1)#启动一个协程
gr2 = greenlet(test2)
gr1.switch()
greenlet 

  gevent实现协程自动切换

  
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import  gevent
def foo():
    print("1")
    gevent.sleep(1) # 模仿IO切换
    print("2")
def bar():
    print("3")
    gevent.sleep(0)
    print("4")
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar)
])
gevent

  协程大并发下载网页(urllib模块):

  通过gevent调用urllib默认是阻塞的,加入monkey模块,把所有的io操作加上标记实现并行操作

  
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: zhoujunlong
from gevent import monkey
from urllib import request
import gevent
monkey.patch_all() #把当前程序的所有的io操作单独做上标记
def f(url):
    print("Get: %s" % url)
    res = request.urlopen(url)
    data = res.read()
    print("%d bytes received from %s" % (len(data), url))
gevent.joinall([
    gevent.spawn(f, "https://www.python.org/"),
    gevent.spawn(f, "https://www.yahoo.com/"),
    gevent.spawn(f, "https://github.com/")
])
View Code

   通过gevent实现单线程下的socket并发

    Server:

    
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: zhoujunlong
import sys
import socket
import time
import gevent
from gevent import socket, monkey
monkey.patch_all()
host = "0.0.0.0"
def server(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((host, port))
    s.listen(500)
    while True:
        conn, addr = s.accept()
        gevent.spawn(handle_request, conn) #前边是函数,后边是函数所需的参数
def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data.decode())
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)
    except Exception as e:
        print("33[31;1merr33[0m", e)
    finally:
        conn.close()
if __name__ == '__main__':
    server(5566)
server

    Client:

    
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: zhoujunlong
import socket
host = "localhost"
port = 5566
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    print("Recv:", repr(data))
s.close()
client

3.  事件驱动与异步IO

  通常,我们写服务器处理模型的程序时,有以下几种模型:

  (1)每收到一个请求,创建一个新的进程,来处理该请求;

  (2)每收到一个请求,创建一个新的线程,来处理该请求;

  (3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求

  上面的几种方式,各有千秋,

  第(1)中方法,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单。

  第(2)种方式,由于要涉及到线程的同步,有可能会面临死锁等问题。

  第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。

  综合考虑各方面因素,一般普遍认为第(3)种方式是大多数网络服务器采用的方式

  事件驱动模型:

  事件驱动大体思路:

  a)有一个事件(消息)队列

  b)鼠标按下时, 往这个队列中增加一个点击事件(消息)

  c)有个循环,部队从队列取出事件,根据不同的事件,调用不同的函数

  d)事件(消息)一般各自保存各自的处理函数指针,这样,每个事件都有独立的处理函数

  事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。

  当我们面对如下的环境时,事件驱动模型通常是一个好的选择:

  • 程序中有许多任务,而且…
  • 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
  • 在等待事件到来时,某些任务会阻塞。

  当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。

  网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。

      

  SelectPollEpoll异步IO

  select  1)最多能维护1024个socket  2)不知道具体哪个socket反回了数据

  poll     去掉了默认文件链接数的限制

  epoll   依然是io多路复用,(tornado)、(twisted)都用epoll, windows不支持epoll  可以知道哪个socket有数据,省去循环,此时数据还存放在内核态,需要用户主动调用接收数据操作,如果这次用户没取数据,下次还继续通知数据来了(水平触发)

(一) 概念说明

- 用户空间和内核空间
- 进程的阻塞
- 缓存 I/O

  现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

  为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。

保存处理机上下文,包括程序计数器和其他寄存器。

  • 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
  • 更新内存管理的数据结构。
  •   进程的阻塞

  文件描述符fd

  文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

  缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

  数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:

  • 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

  - 阻塞 I/O(blocking IO)
  - I/O 多路复用( IO multiplexing)
  - 异步 I/O(asynchronous IO)

  阻塞 I/O(blocking IO)

  所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

  linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

  当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

  I/O 多路复用( IO multiplexing)

  所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

  所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

1

select(rlist, wlist, xlist, timeout=None)

  select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。

  select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但 是这样也会造成效率的降低。

  poll

  int poll (struct pollfd *fds, unsigned int nfds, int timeout);

  不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。

  struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events to watch */
    short revents; /* returned events witnessed */
};

  pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

  从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

  epoll

  epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

  一 epoll操作过程

  epoll操作过程需要三个接口,分别如下:

  int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
  int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

  1)int epoll_create(int size);
  创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。
当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

  2)int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  函数是对指定描述符fd执行op操作。
  - epfd:是epoll_create()的返回值。
  - op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
  - fd:是需要监听的fd(文件描述符)
  - epoll_event:是告诉内核需要监听什么事

  3)int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
  等待epfd上的io事件,最多返回maxevents个事件。
  参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

  select方法(实现socketserver)

  传递给  select  的参数是这么几个列表,分别表示读事件、写事件和错误事件。 select  方法返回三个列表,其中包含满足条件的对象(读、写和异常)。      

  select 多并发socket 例子

        server

    
          #!/usr/bin/python

# -*- coding:utf-8 -*-

# Author:zhoujunlong

import select

import socket

import queue

server = socket.socket()

server.setblocking(0)

server_addr = ('localhost',10000)

print('starting up on %s port %s' % server_addr)

server.bind(server_addr)

server.listen(5)

inputs = [server, ] #自己也要监测呀,因为server本身也是个fd

outputs = []

message_queues = {}

while True:

    print("waiting for next event...")

    readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里

    for s in readable: #每个s就是一个socket

        if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了,

            #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀

            #新连接进来了,接受这个连接

            conn, client_addr = s.accept()

            print("new connection from",client_addr)

            conn.setblocking(0)

            inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接

            #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到

            #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的

            message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送

        else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了

            #客户端的数据过来了,在这接收

            data = s.recv(1024)

            if data:

                print("收到来自[%s]的数据:" % s.getpeername()[0], data)

                message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端

                if s not  in outputs:

                    outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端

            else:#如果收不到data代表什么呢? 代表客户端断开了呀

                print("客户端断开了",s)

                if s in outputs:

                    outputs.remove(s) #清理已断开的连接

                inputs.remove(s) #清理已断开的连接

                del message_queues[s] ##清理已断开的连接

    for s in writeable:

        try :

            next_msg = message_queues[s].get_nowait()

        except queue.Empty:

            print("client [%s]" %s.getpeername()[0], "queue is empty..")

            outputs.remove(s)

        else:

            print("sending msg to [%s]"%s.getpeername()[0], next_msg)

            s.send(next_msg.upper())

    for s in exeptional:

        print("handling exception for ",s.getpeername())

        inputs.remove(s)

        if s in outputs:

            outputs.remove(s)

        s.close()

        del message_queues[s]
server

    client

    
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
sock = socket.socket()
sock.bind(('localhost', 10000))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)
client

  selectors模块

  
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
sock = socket.socket()
sock.bind(('localhost', 10000))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)
selectors模块
原文地址:https://www.cnblogs.com/honey01/p/7201604.html