问:并发、并行、同步、异步、阻塞、非阻塞
答:
并发、并行:
并发是指一个时间段内(不是指的时间点),有几个程序在同一个CPU上运行,但是任意时刻只有一个程序在CPU上运行。对人类的时钟来说1秒钟能干很多事,但是计算机1秒钟运算上亿次。让我感觉是很多程序一起运行,其实是一个程序在运行。
并行是指任意时刻点(这里这里是时刻点 ),有很多个程序同时(这里注意是同时 )在多个CPU上运行。
如果一个CPU是四核,我们最高的并行是4核。
同步、异步:(涉及到IO操作的时候要考虑的,这是属于消息操作的一种方式)
同步是指代码调用IO操作时,必须等待IO操作完成才能返回的调用方式。
异步是指代码调用IO操作时,不必等待IO操作完成就返回的调用方法。(多线程就是典型的异步操作)
阻塞、非阻塞:(涉及到IO操作的时候要考虑的,就是注意挂起的问题。这是属于函数调用的一种方式)
阻塞指的是调用函数时候当前线程被挂起。
非阻塞指调用函数的时候当前线程不会被挂起,而是立刻返回。
问:一个问题的提出:C10K问题是什么?
答:C10K问题是在1999年被提出来的技术挑战:如何一颗1GHz CPU,2G内存,1gbps网络环境下,让单台服务器同时为1万个客户端提供FTP服务。
在早期的互联网用户非常少,不会考虑到并发的问题。一个线程只能处理一个Socket,如果用这种模式不能让一个服务器开启上万个客户的。
问:UNIX下的五种IO模型:
答:
阻塞式IO:
非阻塞式IO:
IO多路复用
信号驱动式IO:现在使用非常少:
异步IO(POSIX的aio_系列函数):
这五种IO模式是一个递进发展的关系。
问:阻塞式IO、非阻塞式IO:
答:比如之前的的socket编程就会遇到很多阻塞式IO
1.client.connect(host,80)、client.recv(1024):
就会遇到三次握手(关于三次握手四次挥手请参照:https://blog.csdn.net/li0978/article/details/52598121),这个过程实际上是阻塞的,如果当前这个网络连接不返回的话,会一直等待网络数据的返回。IO的操作时间和CPU的时间差距非常大,对CPU的利用率非常低的,网络中CPU的资源是非常重要的资源。时间浪费非常严重。
比如我们可以设置client.setblocking(False)的话,connect会立刻返回。但是非阻塞式IO会带来一些问题。如果网路连接没有建立好,send会出问题的,这样就不停的询问连接是否建立好。connect阻塞不会消耗CPU的,但是我们要进行后续的操作,我们必须要确定connect是否连接好了,所以这里需要While True:循环一直询问。但是While循环会消耗CPU的。其实还不如block让它阻塞掉,但是如下下面的代码不依赖于连接,这种非阻塞式IO就非常有用的。可以转而去做其他的事情。
内核:就是操作系统为了保护内存,保留一部分内存给操作系统用,比如我们在调用recvfrom函数是深入到操作系统的函数,再去请求我们的网络,然后在拷贝到应用程序的缓存地址里面。
问:继续上面的问题:我们将数据从内核复制到用户空间,告诉我们的程序准备好了呢?
答:这就是IO复用:
select poll epoll是我们最常用的三种命令方式。
select的方法其实也是一种阻塞的方法。但是和我们当时的While有很多的区别,他可以监听多个socket的状态。前面只能监听一个。监听多个给我们一个非常大的好处。是现在高并发技术应用的最多的点。但是从将数据从内核复制到用户的空间还是需要时间的。但是把很多步骤省略了。
问:信号驱动式IO
答:建立一个信号处理程序,是一种基于信号来的。但是现在应用非常少。
问:异步IO
答:aio开头的。这是真正的异步IO。他会将数据从内核复制到用户空间之后,再回发送信号处理程序处理数据报。是操作系统给我们准备好了之后再发送。
问:IO复用中的select poll epoll:
答:IO复用和异步IO是现在比较常用的技术。这三个都是IO多路复用的机制。IO多路复用就是通过一种机制、一个进程可以监视多个描述符,一旦某个描述符就绪(一般是就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步IO,因为它们都需要在读写时间就绪后自己负责进行读写,也就是说读写过程是阻塞的,而一部IO则无需自己负责进行读写,异步IO会负责把数据从内核拷贝到用户空间。
因此:IO多路复用(同步IO) vs 异步IO,它们之间是这么一种关系。
问:select:
答:selcet函数监视的文件描述分三类:
writefds
readfds
exceptfds
调用后,select函数会阻塞,知道有描述符就绪(有数据可读、可写或者有except)或者超时(timeout指定等待时间,如果立刻返回设为null即可),函数返回。当selcet函数返回后,可以遍历fdset,来找到就绪的描述符。
selcet目前几乎在所有平台上支持,其良好的跨平台支持也是他的一个优点。selcet的一个缺点在于单个继承能够监视文件描述符的数量存在最大显示,在Linux上一般为1024,但是可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样荷叶灰造成效率的极低。
问:poll:
答:不同于selct使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现。
pollfd解耦股包含了要监视的event和发生的event,不再使用 select"参数-值"传递方式。同时pollfd并没有最大数量限制(但是数量过大后性能也是会下降的)。和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。
从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符的增长,其效率也会线性下降。
问:epoll:(在Linux下面支持,在Windows下面不支持的):
答:epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的实际存在到内核的一个事件表中,这样用户和内核控件的copy只需一次。epoll其实时使用了红黑树算法实现的。具体的红黑树算法见:https://www.cnblogs.com/skywang12345/p/3245399.html。epoll并不代表比select好。在并发高的情况下,连接活跃度不高,epoll比select好。并发不高的话,同时连接很活跃,select比epoll好。
问:相关的举例:
答:
1.通过非阻塞IO实现http请求。
import socket from urllib.parse import urlparse # 通过非阻塞IO完成http请求 def get_url(url): # 通过socket请求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" # 建立socket连接 client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.setblocking(False) try: client.connect((host,80)) # 阻塞不会消耗CPU except BlockingIOError as e: pass # 不停的询问连接是否建立好,需要while循环不停的去检查状态 # 做计算任务或再次发起连接请求。 while True: try: client.send("GET {} HTTP/1.1 Host:{} Connection:close ".format(path, host).encode('utf8')) break except OSError as e: pass data = b"" while True: try: d = client.recv(1024) except BlockingIOError as e: continue if d: data += d else: break data = data.decode("utf-8") html_data = data.split(" ")[1] print(html_data) client.close() if __name__ == '__main__': get_url("http://www.baidu.com")
2. 通过IO复用的select的方法:
我们这里使用selectors这个包的DefaultSelector的包,这个包比select包装更好的,而且选择poll方法和epoll方法会根据平台自动选择。还给我们提供了注册的机制。
import socket from urllib.parse import urlparse import select from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE selector = DefaultSelector() class Fetcher: def connected(self, key): selector.unregister(key.fd) self.client.send("GET {} HTTP/1.1 Host:{} Connection:close ".format(self.path, self.host).encode('utf8')) selector.register(self.client.fileno(),EVENT_READ,self.readable) def readable(self,key): d = self.client.recv(1024) if d: self.data += d else: selector.unregister(key.fd) data = self.data.decode("utf-8") html_data = data.split(" ")[1] print(html_data) self.client.close() def get_url(self,url): url = urlparse(url) self.host = url.netloc self.path = url.path self.data = b"" if self.path == "": self.path = "/" # 建立socket连接 self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client.setblocking(False) try: self.client.connect((self.host,80)) # 阻塞不会消耗CPU except BlockingIOError as e: pass #!!!!注册!!!! selector.register(self.client.fileno(),EVENT_WRITE,self.connected) def loop(): # 时间循环:不停的情请求socket的状态并调用对应的回调函数。 # 1. select 本身是不支持register模式的, # 2. socket状态编号以后的回调是由程序员完成的。 while True: ready = selector.select() for key,mask in ready: call_back = key.data call_back(key) # 回调+时间循环+select(poll/epoll) if __name__ == '__main__': fetcher = Fetcher() fetcher.get_url("http://www.baidu.com") loop()
运行这段代码的时候会提示:OSError: [WinError 10022] 提供了一个无效的参数。在Linux下面不会报错。
因此我们在Windows底下添加两个全局变量进行更改,就不会抛异常了:
import socket from urllib.parse import urlparse import select from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE selector = DefaultSelector() urls = ["http://www.baidu.com"] stop = False class Fetcher: def connected(self, key): selector.unregister(key.fd) self.client.send("GET {} HTTP/1.1 Host:{} Connection:close ".format(self.path, self.host).encode('utf8')) selector.register(self.client.fileno(),EVENT_READ,self.readable) def readable(self,key): d = self.client.recv(1024) if d: self.data += d else: selector.unregister(key.fd) data = self.data.decode("utf-8") html_data = data.split(" ")[1] print(html_data) self.client.close() urls.remove(self.spider_url) if not urls: global stop stop = True def get_url(self,url): self.spider_url = url url = urlparse(url) self.host = url.netloc self.path = url.path self.data = b"" if self.path == "": self.path = "/" # 建立socket连接 self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client.setblocking(False) try: self.client.connect((self.host,80)) # 阻塞不会消耗CPU except BlockingIOError as e: pass #!!!!注册!!!! selector.register(self.client.fileno(),EVENT_WRITE,self.connected) def loop(): # 时间循环:不停的情请求socket的状态并调用对应的回调函数。 # 1. select 本身是不支持register模式的, # 2. socket状态编号以后的回调是由程序员完成的。 while not stop: ready = selector.select() for key,mask in ready: call_back = key.data call_back(key) # 回调+时间循环+select(poll/epoll) if __name__ == '__main__': fetcher = Fetcher() fetcher.get_url("http://www.baidu.com") loop()
这种方式的好处就是并发性高。
问:回调之痛?
答:回调会产生很多问题:
如果回调函数执行不正常该如何?
如果回调里面还要嵌套回调怎么办?要嵌套很多层怎么办?
如果嵌套了多层,其中某个环节出错了会造成什么后果?
如果有一个数据需要被每个回调都处理怎么办?
怎么使用当前函数中的局部变量?
总结回调的问题有三个方面:
1.可读性差
2.共享状态管理困难
3.异常处理困难
问:C10M问题?
答:C10M问题是随着互联网的飞速发展,如果利用八核CPU,64G内存,在10gbps的网络上保持10000并发并连接?C10K也满足不了我们了。因此这里就用到了协程了。
问:我们有什么样的处理思路?
答:回调模式有很多的缺点,协程就是为了编写难的问题。我们列举一下当前存在的问题:
1.回调模式编码复杂度高
2.同步编程的并发性不高
3.多线程编程需要线程间同步,用的锁的机制。
怎么解决?
1.采用同步的方式去编写异步的代码
2.使用单线程去切换任务:
1.线程是由操作系统切换的,单线程切换意味着我们需要程序员自己去调度任务
2.不再需要锁,并发性高。如果我们能在单线程直接切换就好像函数之间的调用一样,如果单线程内切换函数,性能远高于线程切换,而且它的并发性更高。如果我们声明1000个函数比声明1000个线程并发性越高。
要实现这些对现有的编程模式有很大的挑战。
问:一个关于函数的问题,关于函数是否可以暂停:
答:比如我们有这么一段代码:
def get_url(url): # do something: html = get_html(url) # 此处暂停,切换到另一个函数去执行 # parse html urls = parse_url(html) def get_url2(): # do something: html = get_html(url) # 此处暂停,切换到另一个函数去执行 # parse html urls = parse_url(html)
我们想一下:
传统函数调用过程就是A执行完执行B执行完执行C。
我们需要一个可以暂停的函数,并且可以在适当的时候回复该函数继续去执行。
如果能够这两个,是不是可以玩儿了。因此这个地方就出现了协程。
协程有两个定义:有多个入口的函数或者说可以暂停的函数(可以向暂停的地方传入值),我们感觉到生成器就是一个可以暂停的概念。
问:还记得我们的生成器怎么用嘛?
答:我们用生成器对象,然后next去调用。生成器是使用了我们迭代协议的。
def gen_func(): yield 1 yield 2 yield 3 return "bobby" if __name__ == '__main__': gen = gen_func() # 我们建立一个生成器对象 print(next(gen)) print(next(gen)) print(next(gen)) print(next(gen))
StopIteration: bobby
1. 生成器不只可以产生值,还可以接收值
def gen_func():
# 1.可以产出值,2.可以接收值(调用方船体进来的值)
html = yield "http://www.baidu.com"
print(html)
yield 2
yield 3
return "bobby"
if __name__ == '__main__':
gen = gen_func() # 我们建立一个生成器对象
# 1.启动生成器方法又两种,next和send
url = next(gen)
html = "bobby111"
print(gen.send(html))
# 2. send方法可以传递值,进入生成器内部,同时还可以重启生成器执行到下一个yield位置。
gen.send(html)
# bobby111
# 2
将我们的值传递给生成器内部,这里要注意的地方:第一次调用send的时候不能send一个非None的值,跟前面的一样,首先要给生成器进行初始化对象。在调用send发送非None值之前,我们必须启动一次生成器,方式有两种:
gen.send(None)
next(gen)
其中,html = yield 内容,跟我们说的一样,yield看做是return,把yield后面的值传递给html
def gen_func(): yield "http://www.baidu.com" yield 2 yield 3 return "bobby" if __name__ == '__main__': gen = gen_func() print(next(gen)) gen.close() next(gen) # 抛出异常:StopIteration:generator ignored GeneratotExit
我们看到他会抛异常。GeneratorExit是继承自BaseException,Exception是比他们更基础的,如果Try Except的话 。gen.close()不会抛异常。close()向上抛关闭。
def gen_func(): try: yield "http://www.baidu.com" except Exception as e: print(e) yield 2 yield 3 return "bobby" if __name__ == '__main__': gen = gen_func() print(next(gen)) gen.throw(Exception,"download error") next(gen) # gen.throw(Exception,"download error") # http: // www.baidu.com # download # error
另外,gen.throw的方式会抛第一个异常,需要处理,但是close不需要处理。
我们上面介绍了生成器的新的内容send,close,throw。
我们在介绍一个在Py3.3添加了yield from语法。
首先介绍一下chain,这个是把一些可迭代对象,连接起来。用for循环进行遍历。
from itertools import chain my_list = [1,2,3] my_dict = { "bobby1":"http://www.baidu.com", "bobby2":"http://www,sina.com" } for value in chain(my_list,my_dict,range(5,10)): print(value) # 1 # 2 # 3 # bobby1 # bobby2 # 5 # 6 # 7 # 8 # 9
我们再来改写一下一个函数来实现这个chain
from itertools import chain my_list = [1,2,3] my_dict = { "bobby1":"http://www.baidu.com", "bobby2":"http://www,sina.com" } def my_chain(*args,**kwargs): for my_iterable in args: for value in my_iterable: yield value for value in my_chain(my_list,my_dict,range(5,10)): print(value) # 1 # 2 # 3 # bobby1 # bobby2 # 5 # 6 # 7 # 8 # 9
这里就有yield from将代码进一步缩减:
yield from EXPR(可以简化)
1. 子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw 和 .close方法。
2.如果自生成其支持.throw和close方法,但是子生成器内部,这两个方法都会抛出异常。
3.调用方让子生成器自己抛出异常。
4.当调用方使用next,send(None)函数,当调用方使用.send()发送非None值是,才调用子生成器.send方法。
from itertools import chain my_list = [1,2,3] my_dict = { "bobby1":"http://www.baidu.com", "bobby2":"http://www,sina.com" } def my_chain(*args,**kwargs): for my_iterable in args: yield from my_iterable # for value in my_iterable: # yield value for value in my_chain(my_list,my_dict,range(5,10)): print(value) # 1 # 2 # 3 # bobby1 # bobby2 # 5 # 6 # 7 # 8 # 9
再举一个例子:
def g1(iterable): yield iterable def g2(iterable): yield from iterable for value in g1(range(10)): print(value) for value in g2(range(10)): print(value) # 0 # 1 # 2 # 3 # 4 # 5 # 6 # 7 # 8 # 9
再举一个最重要的例子:
def g1(gen): yield from gen def main(): g = g1() g.send(None)
在这个例子中才是yield from最重要的应用,也是最核心的点,含义有两个方面:
1. main为调用方,
2. g1(委托生成器)
3. gen(子生成器)
4. yield from 会在调用方与子生成器之间一个双向通道。
根据这个我们举一个yield from 比较详细的一个例子:
final_result = {} def sales_sum(pro_name): total = 0 nums = [] while True: x = yield print(pro_name+"销量:",x) if not x: break total += x nums.append(x) return total,nums def middle(key): while True: final_result[key] = yield from sales_sum(key) print(key+"销量统计完成!!") def main(): data_sets = { "bobby牌面膜":[1200,1500,3000], "bobby牌手机":[20,55,98,100], "bobby牌大一":[280,560,778,70] } for key,data_set in data_sets.items(): m = middle(key) m.send(None) # 预激middle协程 for value in data_set: m.send(value) # 给谢忱个传递每一组值 m.send(None) print("final_result:",final_result) if __name__ == '__main__': main() # bobby牌面膜销量: 1200 # bobby牌面膜销量: 1500 # bobby牌面膜销量: 3000 # bobby牌面膜销量: None # bobby牌面膜销量统计完成!! # bobby牌手机销量: 20 # bobby牌手机销量: 55 # bobby牌手机销量: 98 # bobby牌手机销量: 100 # bobby牌手机销量: None # bobby牌手机销量统计完成!! # bobby牌大一销量: 280 # bobby牌大一销量: 560 # bobby牌大一销量: 778 # bobby牌大一销量: 70 # bobby牌大一销量: None # bobby牌大一销量统计完成!! # final_result: {'bobby牌面膜': (5700, [1200, 1500, 3000]), 'bobby牌手机': (273, [20, 55, 98, 100]), 'bobby牌大一': (1688, [280, 560, 778, 70])}
运用这个模式,就不用try exception处理异常了。就变得非常的简单,不用做大量的try...exception了。yield from帮我们完成了很多的工作。所以记住上面的那种格式。
如下我们对yield from做一个总结:
1. 子生成器产生的值,都是直接传给调用方的:调用方通过.send()发送的值都是直接传递给子生成器的:如果发送的是None,会调用子生成器__next__()方法,如果不是None,会调用子生成器的.send()方法。
2.子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;
3.yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数。
4.如果调用的时候出现了StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上“冒泡”;
5.传入委托生成器的异常里,除了GenerationExit之外,其他的所有异常全部传递给子生成器的throw()方法,如果调用.throw()的是出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上“冒泡”;
6.如果在委托生成器上调用.close()或传入GenerationExit异常,会调用自生成的.close()方法,没有的话就不会调用。如果在调用.close()的时候抛出了异常,那么就向上“冒泡”,否则委托生成器会抛出GenerationExit异常。