Python说文解字_Python之多任务_04

问:并发、并行、同步、异步、阻塞、非阻塞

答:

  并发、并行:

  并发是指一个时间段内(不是指的时间点),有几个程序在同一个CPU上运行,但是任意时刻只有一个程序在CPU上运行。对人类的时钟来说1秒钟能干很多事,但是计算机1秒钟运算上亿次。让我感觉是很多程序一起运行,其实是一个程序在运行。

  并行是指任意时刻点(这里这里是时刻点 ),有很多个程序同时(这里注意是同时 )在多个CPU上运行。

  如果一个CPU是四核,我们最高的并行是4核。

  同步、异步:(涉及到IO操作的时候要考虑的,这是属于消息操作的一种方式

  同步是指代码调用IO操作时,必须等待IO操作完成才能返回的调用方式。

  异步是指代码调用IO操作时,不必等待IO操作完成就返回的调用方法。(多线程就是典型的异步操作)

  阻塞、非阻塞:(涉及到IO操作的时候要考虑的,就是注意挂起的问题。这是属于函数调用的一种方式

  阻塞指的是调用函数时候当前线程被挂起

  非阻塞指调用函数的时候当前线程不会被挂起,而是立刻返回。

问:一个问题的提出:C10K问题是什么?

答:C10K问题是在1999年被提出来的技术挑战:如何一颗1GHz CPU,2G内存,1gbps网络环境下,让单台服务器同时为1万个客户端提供FTP服务。

  在早期的互联网用户非常少,不会考虑到并发的问题。一个线程只能处理一个Socket,如果用这种模式不能让一个服务器开启上万个客户的。

问:UNIX下的五种IO模型:

答:

  阻塞式IO:

  非阻塞式IO:

  IO多路复用

  信号驱动式IO:现在使用非常少:

  异步IO(POSIX的aio_系列函数):

  这五种IO模式是一个递进发展的关系。

问:阻塞式IO、非阻塞式IO:

答:比如之前的的socket编程就会遇到很多阻塞式IO

  1.client.connect(host,80)、client.recv(1024):

  就会遇到三次握手(关于三次握手四次挥手请参照:https://blog.csdn.net/li0978/article/details/52598121),这个过程实际上是阻塞的,如果当前这个网络连接不返回的话,会一直等待网络数据的返回。IO的操作时间和CPU的时间差距非常大,对CPU的利用率非常低的,网络中CPU的资源是非常重要的资源。时间浪费非常严重。

  比如我们可以设置client.setblocking(False)的话,connect会立刻返回。但是非阻塞式IO会带来一些问题。如果网路连接没有建立好,send会出问题的,这样就不停的询问连接是否建立好。connect阻塞不会消耗CPU的,但是我们要进行后续的操作,我们必须要确定connect是否连接好了,所以这里需要While True:循环一直询问。但是While循环会消耗CPU的。其实还不如block让它阻塞掉,但是如下下面的代码不依赖于连接,这种非阻塞式IO就非常有用的。可以转而去做其他的事情。

  内核:就是操作系统为了保护内存,保留一部分内存给操作系统用,比如我们在调用recvfrom函数是深入到操作系统的函数,再去请求我们的网络,然后在拷贝到应用程序的缓存地址里面。

问:继续上面的问题:我们将数据从内核复制到用户空间,告诉我们的程序准备好了呢?

答:这就是IO复用:

  select poll epoll是我们最常用的三种命令方式。

  select的方法其实也是一种阻塞的方法。但是和我们当时的While有很多的区别,他可以监听多个socket的状态。前面只能监听一个。监听多个给我们一个非常大的好处。是现在高并发技术应用的最多的点。但是从将数据从内核复制到用户的空间还是需要时间的。但是把很多步骤省略了。

问:信号驱动式IO

答:建立一个信号处理程序,是一种基于信号来的。但是现在应用非常少。

问:异步IO

答:aio开头的。这是真正的异步IO。他会将数据从内核复制到用户空间之后,再回发送信号处理程序处理数据报。是操作系统给我们准备好了之后再发送。

问:IO复用中的select  poll  epoll:

答:IO复用和异步IO是现在比较常用的技术。这三个都是IO多路复用的机制。IO多路复用就是通过一种机制、一个进程可以监视多个描述符,一旦某个描述符就绪(一般是就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步IO,因为它们都需要在读写时间就绪后自己负责进行读写,也就是说读写过程是阻塞的,而一部IO则无需自己负责进行读写,异步IO会负责把数据从内核拷贝到用户空间。

  因此:IO多路复用(同步IO) vs  异步IO,它们之间是这么一种关系。

问:select:

答:selcet函数监视的文件描述分三类:

  writefds

  readfds  

  exceptfds

  调用后,select函数会阻塞,知道有描述符就绪(有数据可读、可写或者有except)或者超时(timeout指定等待时间,如果立刻返回设为null即可),函数返回。当selcet函数返回后,可以遍历fdset,来找到就绪的描述符。

  selcet目前几乎在所有平台上支持,其良好的跨平台支持也是他的一个优点。selcet的一个缺点在于单个继承能够监视文件描述符的数量存在最大显示,在Linux上一般为1024,但是可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样荷叶灰造成效率的极低。

问:poll:

答:不同于selct使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现。

  pollfd解耦股包含了要监视的event和发生的event,不再使用 select"参数-值"传递方式。同时pollfd并没有最大数量限制(但是数量过大后性能也是会下降的)。和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

  从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符的增长,其效率也会线性下降。

问:epoll:(在Linux下面支持,在Windows下面不支持的):

答:epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的实际存在到内核的一个事件表中,这样用户和内核控件的copy只需一次。epoll其实时使用了红黑树算法实现的。具体的红黑树算法见:https://www.cnblogs.com/skywang12345/p/3245399.html。epoll并不代表比select好。在并发高的情况下,连接活跃度不高,epoll比select好。并发不高的话,同时连接很活跃,select比epoll好。

问:相关的举例:

答:

  1.通过非阻塞IO实现http请求。

import socket
from urllib.parse import urlparse

# 通过非阻塞IO完成http请求
def get_url(url):
    # 通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    # 建立socket连接
    client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    client.setblocking(False)
    try:
        client.connect((host,80))  # 阻塞不会消耗CPU
    except BlockingIOError as e:
        pass

    # 不停的询问连接是否建立好,需要while循环不停的去检查状态
    # 做计算任务或再次发起连接请求。

    while True:
        try:
            client.send("GET {} HTTP/1.1
Host:{}
Connection:close

".format(path, host).encode('utf8'))
            break
        except OSError as e:
            pass


    data = b""
    while True:
        try:
            d = client.recv(1024)
        except BlockingIOError as e:
            continue
        if d:
            data += d
        else:
            break

    data = data.decode("utf-8")
    html_data = data.split("

")[1]
    print(html_data)
    client.close()

if __name__ == '__main__':
    get_url("http://www.baidu.com")

  

  2. 通过IO复用的select的方法:

  我们这里使用selectors这个包的DefaultSelector的包,这个包比select包装更好的,而且选择poll方法和epoll方法会根据平台自动选择。还给我们提供了注册的机制。

import socket
from urllib.parse import urlparse
import select
from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE

selector = DefaultSelector()

class Fetcher:
    def connected(self, key):
        selector.unregister(key.fd)
        self.client.send("GET {} HTTP/1.1
Host:{}
Connection:close

".format(self.path, self.host).encode('utf8'))
        selector.register(self.client.fileno(),EVENT_READ,self.readable)

    def readable(self,key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode("utf-8")
            html_data = data.split("

")[1]
            print(html_data)
            self.client.close()

    def get_url(self,url):
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        # 建立socket连接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)

        try:
            self.client.connect((self.host,80))  # 阻塞不会消耗CPU
        except BlockingIOError as e:
            pass

        #!!!!注册!!!!
        selector.register(self.client.fileno(),EVENT_WRITE,self.connected)

def loop():
    # 时间循环:不停的情请求socket的状态并调用对应的回调函数。
    # 1. select 本身是不支持register模式的,
    # 2. socket状态编号以后的回调是由程序员完成的。
    while True:
        ready = selector.select()
        for key,mask in ready:
            call_back = key.data
            call_back(key)
    # 回调+时间循环+select(poll/epoll)

if __name__ == '__main__':
    fetcher = Fetcher()
    fetcher.get_url("http://www.baidu.com")
    loop()

  运行这段代码的时候会提示:OSError: [WinError 10022] 提供了一个无效的参数。在Linux下面不会报错。

  因此我们在Windows底下添加两个全局变量进行更改,就不会抛异常了:

import socket
from urllib.parse import urlparse
import select
from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE

selector = DefaultSelector()
urls = ["http://www.baidu.com"]
stop = False


class Fetcher:
    def connected(self, key):
        selector.unregister(key.fd)
        self.client.send("GET {} HTTP/1.1
Host:{}
Connection:close

".format(self.path, self.host).encode('utf8'))
        selector.register(self.client.fileno(),EVENT_READ,self.readable)

    def readable(self,key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode("utf-8")
            html_data = data.split("

")[1]
            print(html_data)
            self.client.close()
            urls.remove(self.spider_url)
            if not urls:
                global stop
                stop = True

    def get_url(self,url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        # 建立socket连接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)

        try:
            self.client.connect((self.host,80))  # 阻塞不会消耗CPU
        except BlockingIOError as e:
            pass

        #!!!!注册!!!!
        selector.register(self.client.fileno(),EVENT_WRITE,self.connected)

def loop():
    # 时间循环:不停的情请求socket的状态并调用对应的回调函数。
    # 1. select 本身是不支持register模式的,
    # 2. socket状态编号以后的回调是由程序员完成的。
    while not stop:
        ready = selector.select()
        for key,mask in ready:
            call_back = key.data
            call_back(key)
    # 回调+时间循环+select(poll/epoll)

if __name__ == '__main__':
    fetcher = Fetcher()
    fetcher.get_url("http://www.baidu.com")
    loop()

  这种方式的好处就是并发性高。

问:回调之痛?

答:回调会产生很多问题:

  如果回调函数执行不正常该如何?

  如果回调里面还要嵌套回调怎么办?要嵌套很多层怎么办?

  如果嵌套了多层,其中某个环节出错了会造成什么后果?

  如果有一个数据需要被每个回调都处理怎么办?

  怎么使用当前函数中的局部变量?

  总结回调的问题有三个方面:

  1.可读性差

  2.共享状态管理困难

  3.异常处理困难

问:C10M问题?

答:C10M问题是随着互联网的飞速发展,如果利用八核CPU,64G内存,在10gbps的网络上保持10000并发并连接?C10K也满足不了我们了。因此这里就用到了协程了。

问:我们有什么样的处理思路?

答:回调模式有很多的缺点,协程就是为了编写难的问题。我们列举一下当前存在的问题:

  1.回调模式编码复杂度高

  2.同步编程的并发性不高

  3.多线程编程需要线程间同步,用的锁的机制。

  怎么解决?

  1.采用同步的方式去编写异步的代码

  2.使用单线程去切换任务:

    1.线程是由操作系统切换的,单线程切换意味着我们需要程序员自己去调度任务

    2.不再需要锁,并发性高。如果我们能在单线程直接切换就好像函数之间的调用一样,如果单线程内切换函数,性能远高于线程切换,而且它的并发性更高。如果我们声明1000个函数比声明1000个线程并发性越高。

  要实现这些对现有的编程模式有很大的挑战。

问:一个关于函数的问题,关于函数是否可以暂停:

答:比如我们有这么一段代码:

def get_url(url):
    # do something:
    html = get_html(url)  # 此处暂停,切换到另一个函数去执行
    # parse html
    urls = parse_url(html)

def get_url2():
    # do something:
    html = get_html(url)  # 此处暂停,切换到另一个函数去执行
    # parse html
    urls = parse_url(html)

  我们想一下:

  传统函数调用过程就是A执行完执行B执行完执行C。

  我们需要一个可以暂停的函数,并且可以在适当的时候回复该函数继续去执行。

  如果能够这两个,是不是可以玩儿了。因此这个地方就出现了协程

  协程有两个定义:有多个入口的函数或者说可以暂停的函数(可以向暂停的地方传入值),我们感觉到生成器就是一个可以暂停的概念。

问:还记得我们的生成器怎么用嘛?

答:我们用生成器对象,然后next去调用。生成器是使用了我们迭代协议的。

def gen_func():
    yield 1
    yield 2
    yield 3
    return "bobby"

if __name__ == '__main__':
    gen = gen_func() # 我们建立一个生成器对象
    print(next(gen))
    print(next(gen))
    print(next(gen))
    print(next(gen))
StopIteration: bobby

  1. 生成器不只可以产生值,还可以接收值  

def gen_func():
# 1.可以产出值,2.可以接收值(调用方船体进来的值)
html = yield "http://www.baidu.com"
print(html)
yield 2
yield 3
return "bobby"

if __name__ == '__main__':
gen = gen_func() # 我们建立一个生成器对象

# 1.启动生成器方法又两种,next和send
url = next(gen)
html = "bobby111"
print(gen.send(html))
# 2. send方法可以传递值,进入生成器内部,同时还可以重启生成器执行到下一个yield位置。
gen.send(html)

# bobby111
# 2

  将我们的值传递给生成器内部,这里要注意的地方:第一次调用send的时候不能send一个非None的值,跟前面的一样,首先要给生成器进行初始化对象。在调用send发送非None值之前,我们必须启动一次生成器,方式有两种:

  gen.send(None)

  next(gen)

  其中,html = yield 内容,跟我们说的一样,yield看做是return,把yield后面的值传递给html

def gen_func():
    yield "http://www.baidu.com"
    yield 2
    yield 3
    return "bobby"

if __name__ == '__main__':
    gen = gen_func()
    print(next(gen))
    gen.close()
    next(gen)

    # 抛出异常:StopIteration:generator ignored GeneratotExit

  我们看到他会抛异常。GeneratorExit是继承自BaseException,Exception是比他们更基础的,如果Try Except的话 。gen.close()不会抛异常。close()向上抛关闭。

def gen_func():
    try:
        yield "http://www.baidu.com"
    except Exception as e:
        print(e)
    yield 2
    yield 3
    return "bobby"

if __name__ == '__main__':
    gen = gen_func()
    print(next(gen))
    gen.throw(Exception,"download error")
    next(gen)
    # gen.throw(Exception,"download error")

    # http: // www.baidu.com
    # download
    # error

   另外,gen.throw的方式会抛第一个异常,需要处理,但是close不需要处理。

  我们上面介绍了生成器的新的内容send,close,throw。

  我们在介绍一个在Py3.3添加了yield from语法。

  首先介绍一下chain,这个是把一些可迭代对象,连接起来。用for循环进行遍历。

from itertools import chain

my_list = [1,2,3]
my_dict = {
    "bobby1":"http://www.baidu.com",
    "bobby2":"http://www,sina.com"
}

for value in chain(my_list,my_dict,range(5,10)):
    print(value)

    # 1
    # 2
    # 3
    # bobby1
    # bobby2
    # 5
    # 6
    # 7
    # 8
    # 9

   我们再来改写一下一个函数来实现这个chain

from itertools import chain

my_list = [1,2,3]
my_dict = {
    "bobby1":"http://www.baidu.com",
    "bobby2":"http://www,sina.com"
}

def my_chain(*args,**kwargs):
    for my_iterable in args:
        for value in my_iterable:
            yield value


for value in my_chain(my_list,my_dict,range(5,10)):
    print(value)

    # 1
    # 2
    # 3
    # bobby1
    # bobby2
    # 5
    # 6
    # 7
    # 8
    # 9

   这里就有yield from将代码进一步缩减:

yield from EXPR(可以简化)
1. 子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw 和 .close方法。
2.如果自生成其支持.throw和close方法,但是子生成器内部,这两个方法都会抛出异常。
3.调用方让子生成器自己抛出异常。
4.当调用方使用next,send(None)函数,当调用方使用.send()发送非None值是,才调用子生成器.send方法。
from itertools import chain

my_list = [1,2,3]
my_dict = {
    "bobby1":"http://www.baidu.com",
    "bobby2":"http://www,sina.com"
}

def my_chain(*args,**kwargs):
    for my_iterable in args:
        yield from my_iterable
        # for value in my_iterable:
        #     yield value


for value in my_chain(my_list,my_dict,range(5,10)):
    print(value)

    # 1
    # 2
    # 3
    # bobby1
    # bobby2
    # 5
    # 6
    # 7
    # 8
    # 9

  再举一个例子:

def g1(iterable):
    yield iterable
def g2(iterable):
    yield from iterable

for value in g1(range(10)):
    print(value)
for value in g2(range(10)):
    print(value)


# 0
# 1
# 2
# 3
# 4
# 5
# 6
# 7
# 8
# 9

  

  再举一个最重要的例子:

def g1(gen):
    yield from gen

def main():
    g = g1()
    g.send(None)

   在这个例子中才是yield from最重要的应用,也是最核心的点,含义有两个方面:

  1. main为调用方,

  2. g1(委托生成器)

  3. gen(子生成器)

  4. yield from 会在调用方与子生成器之间一个双向通道。

  根据这个我们举一个yield from 比较详细的一个例子:

final_result = {}

def sales_sum(pro_name):
    total = 0
    nums = []
    while True:
        x = yield
        print(pro_name+"销量:",x)
        if not x:
            break
        total += x
        nums.append(x)
    return total,nums

def middle(key):
    while True:
        final_result[key] = yield from sales_sum(key)
        print(key+"销量统计完成!!")

def main():
    data_sets = {
        "bobby牌面膜":[1200,1500,3000],
        "bobby牌手机":[20,55,98,100],
        "bobby牌大一":[280,560,778,70]
    }
    for key,data_set in data_sets.items():
        m = middle(key)
        m.send(None) # 预激middle协程
        for value in data_set:
            m.send(value) # 给谢忱个传递每一组值
        m.send(None)
    print("final_result:",final_result)

if __name__ == '__main__':
    main()

# bobby牌面膜销量: 1200
# bobby牌面膜销量: 1500
# bobby牌面膜销量: 3000
# bobby牌面膜销量: None
# bobby牌面膜销量统计完成!!
# bobby牌手机销量: 20
# bobby牌手机销量: 55
# bobby牌手机销量: 98
# bobby牌手机销量: 100
# bobby牌手机销量: None
# bobby牌手机销量统计完成!!
# bobby牌大一销量: 280
# bobby牌大一销量: 560
# bobby牌大一销量: 778
# bobby牌大一销量: 70
# bobby牌大一销量: None
# bobby牌大一销量统计完成!!
# final_result: {'bobby牌面膜': (5700, [1200, 1500, 3000]), 'bobby牌手机': (273, [20, 55, 98, 100]), 'bobby牌大一': (1688, [280, 560, 778, 70])}

  运用这个模式,就不用try exception处理异常了。就变得非常的简单,不用做大量的try...exception了。yield from帮我们完成了很多的工作。所以记住上面的那种格式。

  如下我们对yield from做一个总结:

  1. 子生成器产生的值,都是直接传给调用方的:调用方通过.send()发送的值都是直接传递给子生成器的:如果发送的是None,会调用子生成器__next__()方法,如果不是None,会调用子生成器的.send()方法。

  2.子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;

  3.yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数。

  4.如果调用的时候出现了StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上“冒泡”;

  5.传入委托生成器的异常里,除了GenerationExit之外,其他的所有异常全部传递给子生成器的throw()方法,如果调用.throw()的是出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上“冒泡”;

  6.如果在委托生成器上调用.close()或传入GenerationExit异常,会调用自生成的.close()方法,没有的话就不会调用。如果在调用.close()的时候抛出了异常,那么就向上“冒泡”,否则委托生成器会抛出GenerationExit异常。

  

原文地址:https://www.cnblogs.com/noah0532/p/11017357.html