Python协程

前言

什么是单线程下的并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发。这样就可以节省创建线进程所消耗的时间。

并发的本质:切换+保存状态

yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下:

yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级

 send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换

生成器实现切换

def wrapper(func):
    print('wrap')
    def inner(*args, **kwargs):
        res = func(*args, **kwargs)
        next(res)
        return res
    return inner

@wrapper
def Consumer():
    while True:
        x = yield
        print('con {}'.format(x))



# 协程 微线程可以切换的函数,或者生成器
def Producer(c):
    # c.send(None) # 启动生成器 ,代码运行到yield
    n = 0
    while n < 5:
        n = n + 1
        print('producer {}'.format(n))
        """
        send方法会首先把上一次挂起的yield语句的返回值通过参数设定, 
        从而实现与生成器方法的交互。但是需要注意,在一个生成器对象没有执行next方法之前,
        由于没有yield语句被挂起,所以执行send方法会报错.除非执行send(None).
        """
        c.send(n)

res = Consumer()
Producer(res)

 对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,

这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,

让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。

协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。

协程

协程:微线程, 协程是一种用户态的轻量级线程,CPU不知道它的存在,即协程是由用户程序自己控制调度的。

优点:

1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2. 单线程内就可以实现并发的效果,最大限度地利用cpu

缺点:

1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

协程特点:

  1. 必须在只有一个单线程里实现并发

  2. 修改共享数据不需加锁

  3. 用户程序里自己保存多个控制流的上下文栈

Greenlet模块

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator(生成器)

greenlet使用

价值一: 高性能的原生协程

价值二: 语义更加明确的显式切换

价值三: 直接将函数包装成协程,保持原有代码风格

安装 sudo pip3 install greenlet

from greenlet import greenlet
import random
import time


def producer():
    while True:
        item = random.randint(0, 10)
        print('producer %s' %item)
        c.switch(item)  # 切换到消费者, 并将item传入消费者 切换到C
        time.sleep(1)
        print('sss')


def consumer():
    print("我先执行")
    while True:
        item = p.switch()  # 切换到生产者,并等待生产者传入item (恢复时接收到数据)
        print('consume %s' % item)


if __name__ == '__main__':   
    c = greenlet(consumer)  # 将一个普通函数变为协程
    p = greenlet(producer)
    c.switch()  # 让消费者进入暂停状态(只有恢复才能接受到数据)

 swich()  就是切换,   按执行顺序--  但是遇到IO操作 好像并没有自动切换

Gevent模块

gevent 是一个第三方库,通过greenlet实现协程,核心就是在遇到IO操作,会自动切换状态

安装 sudo pip3 install gevent

举例使用

from gevent import monkey;monkey.patch_all()# monkey补丁 会把python标准库当中的一些阻塞操作变为非阻塞(要写在第一行)
import gevent


def test1():
    print(12)
    gevent.sleep(2)  # 模拟网络请求
    """
        在gevent模块里面要用gevent.sleep(2)表示阻塞,进行切换
        然而我们经常用time.sleep()用习惯了,那么有些人就想着
        可以用time.sleep(),那么也不是不可以。要想用,就得在
        最上面导入from gevent import monkey;monkey.patch_all()这句话
        如果不导入直接用time.sleep(),就不会切换,从而实现不了单线程并发的效果了
    """
    print(34)


def test2():
    print(72)
    gevent.sleep(1)
    print(89)

if __name__ == '__main__':

    # joinall阻塞当前执行流程,执行给定greenlet
    # spawn 启动协程 参数就是函数名和参数
    gevent.joinall([gevent.spawn(test1), gevent.spawn(test2)])
    print('complete')

需要说明的是:

gevent.sleep(2)模拟的是gevent可以识别的io阻塞,

而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了

from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前

或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头

协程并发服务器

from gevent import monkey;monkey.patch_all()  # 打补丁 会把socket变成非阻塞
import socket
import time
import gevent

server = socket.socket()
server.bind(('127.0.0.1',9988))
server.listen(5)

def worker(conn,addr):
    """
    协程切换,负责和客户端连接
    :param conn:
    :param addr:
    :return:
    """
    while True:
        data = conn.recv(1024)
        if data: #
            print('{}:{}'.format(addr,data.decode()))
            conn.send(data.upper())
        else: # 正常断开,会收到空消息(回车不算空消息)
            print('close{}'.format(addr))
            break
    conn.close()

if __name__ == '__main__':
    while True:
        print('-------主线程,等待连接------')
        conn, addr = server.accept()
        print('创建一个新的协程,和客户端{}通信'.format(addr))
        gevent.spawn(worker, conn, addr)

---------输出
创建一个新的协程,和客户端('127.0.0.1', 15529)通信
-------主线程,等待连接------
创建一个新的协程,和客户端('127.0.0.1', 15530)通信
-------主线程,等待连接------
创建一个新的协程,和客户端('127.0.0.1', 15531)通信
-------主线程,等待连接------
创建一个新的协程,和客户端('127.0.0.1', 15532)通信
-------主线程,等待连接------
创建一个新的协程,和客户端('127.0.0.1', 15533)通信
-------主线程,等待连接------
创建一个新的协程,和客户端('127.0.0.1', 15534)通信
-------主线程,等待连接------
创建一个新的协程,和客户端('127.0.0.1', 15535)通信
-------主线程,等待连接------
('127.0.0.1', 15529):say hello 7
('127.0.0.1', 15530):say hello 8
('127.0.0.1', 15531):say hello 9
('127.0.0.1', 15532):say hello 10
('127.0.0.1', 15533):say hello 11
('127.0.0.1', 15534):say hello 12
('127.0.0.1', 15535):say hello 13
……

  

客户端(模拟多个客户端发消息)

import socket
import threading
from threading import currentThread


def task(i):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('127.0.0.1', 9988))
    client.send(("say hello %s" %i).encode('utf-8'))
    print(client.recv(1024).decode('utf-8'))
    client.close()

if __name__ == '__main__':

    for i in range(100):
        t = threading.Thread(target=task,args=(i,))
        t.start()

------------输出
SAY HELLO 2
SAY HELLO 1
SAY HELLO 5
SAY HELLO 3
SAY HELLO 6
SAY HELLO 0
SAY HELLO 4
SAY HELLO 7
SAY HELLO 8
SAY HELLO 9
SAY HELLO 10
SAY HELLO 11
SAY HELLO 12
SAY HELLO 13
SAY HELLO 14
……

  

协程间的队列通信

# 协程遇到阻塞会默认切换
from gevent import monkey;monkey.patch_all()  # 打补丁 会动态把部分python标准库变成非阻塞
import time
import gevent
import time
import random
from gevent.queue import Queue


def producer(queue):
    while True:
        s = random.randint(1,9)
        print('producer {}'.format(s))
        queue.put(s)
        # gevent.sleep(2)  # 设置阻塞,切换到消费者
        time.sleep(2)


def consumer(queue):
    while True:
            s = queue.get()  # 没有元素后会阻塞,切换到生产者
            print('consumer {}'.format(s))


if __name__ == '__main__':
    queue = Queue(1)
    print('start producer')
    p = gevent.spawn(producer,queue)  # 开启生产者
    print('start consumer')
    q = gevent.spawn(consumer,queue)  # 开启消费者

    gevent.joinall([p,q])

队列通信2

from gevent import monkey;monkey.patch_all()
from gevent import queue
import gevent
import time

def producer(q, name, a):
    for i in range(5):
        s = '[{}] {}'.format(a, i)
        print('<{}> producer'.format(name), s)
        q.put(s)
        time.sleep(1)


def consumer(q, name):
    while True:
        s = q.get()
        if s is None: break
        print('<{}> consumer'.format(name), s)
        # gevent.sleep(2)


if __name__ == '__main__':
    q = queue.Queue()
    print('start')
    s1 = gevent.spawn(producer,q, 'egon', '玉米')
    s2 = gevent.spawn(consumer,q, 'alex')
    gevent.joinall([s1])
    q.put(None)

  

协程在爬虫中的应用

from gevent import monkey;monkey.patch_all() # 打补丁
import gevent
import requests
import time


def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,'https://www.python.org/'),
    gevent.spawn(get_page,'https://www.yahoo.com/'),
    gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))

结果

原文地址:https://www.cnblogs.com/xiao-apple36/p/8668743.html