(2)协程之 greenlet模块

一、greenlet模块


如果我们在单个线程内有20个任务,要想实现在多个任务之间切换,使用yield生成器的方式过于麻烦(需要先得到初始化一次的生成器,然后再调用send。。。非常麻烦),而使用greenlet模块可以非常简单地实现这20个任务直接的切换。

from greenlet import greenlet
# greenlet能实现很方便的切换,但是不能实现检测到 I/O才切

# 吃一会返,玩一会手机,来回切换
def eat(name):
    print("%s eat 1." % name)
    g2.switch("托儿所")        # 第一次切换
    print("%s eat 2." % name)
    g2.switch()

def play(name):
    print("%s play 1" % name)
    g1.switch()
    print("%s play 2" % name)

g1 = greenlet(eat)
g2 = greenlet(play)

g1.switch("托儿所")     # 启动/切换,可以在第一次switch时传入参数,以后都不需要。

"""
托儿所 eat 1.
托儿所 play 1
托儿所 eat 2.
托儿所 play 2
"""

单纯的切换(在没有io的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度。

# 顺序执行
import time
def f1():
    res  =1
    for i in range(100000000):
        res += i

def f2():
    res = 1
    for i in range(100000000):
        res *= i

start = time.time()
f1()
f2()
stop = time.time()
print('run time is %s' % (stop-start))  
# 10.985628366470337
顺序执行
# 切换
from greenlet import greenlet
import time
def f1():
    res = 1
    for i in range(100000000):
        res += i
        g2.switch()

def f2():
    res = 1
    for i in range(100000000):
        res *= i
        g1.switch()

start = time.time()
g1 = greenlet(f1)
g2 = greenlet(f2)
g1.switch()
stop = time.time()
print('run time is %s' % (stop-start))   
# 52.763017892837524
切换

greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时如果遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。

单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。

二、gevent模块


Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以 C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

# 用法

# 创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
g1 = gevent.spawn(func,1,2,3,x=4,y=5)    

g2 = gevent.spawn(func2)

g1.join()   # 等待g1结束

g2.join()   # 等待g2结束

# 或者上述两步合作一步:gevent.joinall([g1,g2])

g1.value    # 拿到func1的返回值

遇到IO阻塞时会自动切换任务:

import gevent
import time

def eat(name):
    print("%s eat 1." % name)
    gevent.sleep(3)             # 与 time.sleep() 是一样的道理
    print("%s eat 2." % name)

def play(name):
    print("%s play 1" % name)
    gevent.sleep(4)
    print("%s play 2" % name)

start = time.time()

g1 = gevent.spawn(eat,"托儿所")    # 提交任务,(任务名,参数)
g2 = gevent.spawn(play,"托儿所")   # 异步提交,不等待结果,提交完后,有可能这俩任务还没开起来,就结束了。

g1.join()       # 等这俩任务执行,
g2.join()
# 或者gevent.joinall([g1,g2])

stop = time.time()
print(start-stop)

"""
托儿所 eat 1.
托儿所 play 1
托儿所 eat 2.
托儿所 play 2
-4.05228066444397
"""

上例 gevent.sleep(3)模拟的是 gevent可以识别的 I/O阻塞,而 time.sleep(3)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了,

from gevent import monkey;monkey.patch_all()

必须放到被打补丁者的前面,如 time,socket模块之前,或者我们干脆记忆成:要用gevent,需要将

from gevent import monkey;monkey.patch_all() 放到文件的开头。

# 但凡要用 gevent模块实现一个检测 I/O操作,就需要在整个文件的开头写上下面两行代码。
from gevent import monkey;monkey.patch_all()
# monkey.path_all()   

import gevent
import time

def eat(name):
    print("%s eat 1." % name)
    time.sleep(3)               # 即便不是 gevent模块的 I/O操作,加上最上面的两行代码,也是能够实现监测的
    print("%s eat 2." % name)

def play(name):
    print("%s play 1" % name)
    time.sleep(4)
    print("%s play 2" % name)

start = time.time()

g1 = gevent.spawn(eat,"托儿所")
g2 = gevent.spawn(play,"托儿所")

g1.join()
g2.join()

stop = time.time()
print(start-stop)

"""
托儿所 eat 1.
托儿所 play 1
托儿所 eat 2.
托儿所 play 2
-4.036099433898926
如果串行的话,需要7s多,
"""

我们可以用 threading.current_thread().getName()来查看每个g1和g2,查看的结果为DummyThread-n,即假线程。

# 如果单线程下多个任务是计算密集型,gevent模块就没用了,因为gevent模块就是要监测 I/O,
# 遇到 I/O才切,好利用第一个任务 I/O的时间把第二个任务顺便给做了,实现提升单线程的运行效率的效果。

# gevent模块的应用场景是:单线程下多个任务时,I/O密集型。

三、练习


通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)

服务端:

# 基于 gevent模块实现
from gevent import monkey,spawn;monkey.patch_all()      # spawn用它来提交线程对象
from socket import *

def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data:continue
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()

def server(ip,port):
    server = socket(AF_INET, SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)

    while True:
        conn, addr = server.accept()
        spawn(communicate,conn)     # 这里没必要加 join,死循环,不会结束,

    server.close()

if __name__ == '__main__':
    g = spawn(server,"192.168.2.209",8900)  # 异步提交,必须join
    g.join()

客户端:

from socket import *
from threading import Thread,currentThread

def client():
    client = socket(AF_INET,SOCK_STREAM)
    client.connect(("192.168.2.209",8900))

    while True:
        client.send(("%s say hello." % currentThread().getName()).encode("utf-8"))
        data = client.recv(1024)
        print("收到的数据:%s" % data.decode("utf-8"))

    client.close()

if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=client)
        t.start()
原文地址:https://www.cnblogs.com/zoling7/p/13403020.html