python-study-36

python并发编程之协程

1、协程:
    单线程实现并发
    在应用程序里控制多个任务的切换+保存状态
    优点:
        应用程序级别速度要远远高于操作系统的切换
    缺点:
        多个任务一旦有一个阻塞没有切,整个线程都阻塞在原地
        该线程内的其他的任务都不能执行了

        一旦引入协程,就需要检测单线程下所有的IO行为,
        实现遇到IO就切换,少一个都不行,以为一旦一个任务阻塞了,整个线程就阻塞了,
        其他的任务即便是可以计算,但是也无法运行了

2、协程的目的:
    想要在单线程下实现并发
    可利用的cpu只有一个
    并发本质=切换+保存状态
    并发指的是多个任务看起来是同时运行的
介绍
from gevent import monkey,spawn;monkey.patch_all()
from threading import current_thread
import time

def eat():
    print('%s eat 1' %current_thread().name)
    time.sleep(3)
    print('%s eat 2' %current_thread().name)

def play():
    print('%s play 1' %current_thread().name)
    time.sleep(1)
    print('%s play 2' %current_thread().name)

g1=spawn(eat,)
g2=spawn(play,)

print(current_thread().name)
g1.join()
g2.join()



并发套接字通信
server:
from gevent import spawn,monkey;monkey.patch_all()
from socket import *
from threading import Thread

def talk(conn):
    while True:
        try:
            data=conn.recv(1024)
            if len(data) == 0:break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()

def server(ip,port,backlog=5):
    server = socket(AF_INET, SOCK_STREAM)
    server.bind((ip, port))
    server.listen(backlog)

    print('starting...')
    while True:
        conn, addr = server.accept()
        spawn(talk, conn,)

if __name__ == '__main__':
    g=spawn(server,'127.0.0.1',8080)
    g.join()

client:
from threading import Thread,current_thread
from socket import *
import os

def task():
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))

    while True:
        msg='%s say hello' %current_thread().name
        client.send(msg.encode('utf-8'))
        data=client.recv(1024)
        print(data.decode('utf-8'))

if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=task)
        t.start()
代码演示

python并发编程之IO模型

1 阻塞IO(blocking IO)

2 非阻塞IO(non-blocking IO)

3 多路复用IO(IO multiplexing)

4 异步IO(Asynchronous I/O)

1 阻塞IO(blocking IO)

'''
网络IO:
    recvfrom:
        wait data:等待客户端产生数据——》客户端OS--》网络--》服务端操作系统缓存
        copy data:由本地操作系统缓存中的数据拷贝到应用程序的内存中

    send:
        copy data

'''

# conn.recv(1024) ==>OS


在linux中,默认情况下所有的socket都是blocking
    当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。

    而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
    所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

    几乎所有的程序员第一次接触到的网络编程都是从listen()、send()、recv() 等接口开始的,使用这些接口可以很方便的构建服务器/客户机的模型。然而大部分的socket接口都是阻塞型的。如下图

    ps:所谓阻塞型接口是指系统调用(一般是IO接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。

    实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。
阻塞IO

2 非阻塞IO(non-blocking IO)  代码演示

Linux下,可以通过设置socket使其变为non-blocking

从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。

    也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

  所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。

    但是非阻塞IO模型绝不被推荐。

    我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。

    但是也难掩其缺点:

#1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
#2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
    此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃。
介绍
from socket import *
import time

server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8080))
server.listen(5)
server.setblocking(False)

conn_l=[]
while True:
    try:
        print('总连接数[%s]' % len(conn_l))
        conn,addr=server.accept()
        conn_l.append(conn)
    except BlockingIOError:
        del_l=[]
        for conn in conn_l:
            try:
                data=conn.recv(1024)
                if len(data) == 0:
                    del_l.append(conn)
                    continue
                conn.send(data.upper())
            except BlockingIOError:
                pass
            except ConnectionResetError:
               del_l.append(conn)

        for conn in del_l:
            conn_l.remove(conn)
server
from socket import *
import os

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))

while True:
    msg='%s say hello' %os.getpid()
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))
client

上节课复习

上节课复习:
    1、GIL全局解释器锁
        什么是?
            GIL本质就是一个把互斥锁,是将多个并发的线程对共享数据的修改
            变成“串行”

        为何有?
           Cpython解释器的垃圾回收机制不是线程安全的


        如何用?
            有GIL的存在,导致同一个进程内的多个线程同一时刻只能有一个运行
            即同一进程的多个线程无法实现并行=》无法利用多核优势
            但是可以实现并发的效果

        什么是多核优势?
            多核即多个cpu,多个cpu带来的优势是计算性能的提升,所以

            IO密集型:
                多线程
            计算密集型
                多进程

        GIL vs 自定义互斥锁
            GIL相当于执行权限,意思是在一个进程内的多个线程想要执行,必须
            先抢GIL,这把锁的特点是,当一个线程被剥夺走cpu的执行权限的同时会被
            解释器强行释放GIL

    2、进程池与线程池
        什么是?
            池:池用来限制进程/线程个数的一种机制
        为何用?
            当并发的任务数远大于计算机的承受能力,应该用池的概念
            将并发的进程数/线程数控制在计算机能承受的数目

        如何用?
            from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
            def task(n):
                return n**2
            def func(future):
                print(future.result())

            if __name__ == '__main__':
                p=ProcessPoolExecutor(4)
                p.submit(task,10).add_done_callback(func)

                p.shutdown(wait=True)
                # pool.close()
                # pool.join()
                print('')

        提交任务的两种方式:
            1、同步:提交完任务后就在原地等待,直到任务运行完毕并且拿到返回值后,才运行下一行代码
            2、异步:提交完任务(绑定一个回调函数)后不原地等待,直接运行下一行代码,等到任务运行有返回值自动触发回调的函数的运行

        程序的运行状态(阻塞,非阻塞)
            1、阻塞:
                IO阻塞
            2、非阻塞:
                运行
                就绪

今日内容:
    1、协程
    2、IO模型
        阻塞IO
        非阻塞IO
        IO多路复用
        异步IO
View Code
原文地址:https://www.cnblogs.com/xujinjin18/p/9325821.html