10_多协程

1.协程概述

    1.协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元,协程也称作微线程,在本质只有一个线程在运行

    2.在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行

    3.协程是通过生成器函数实现,函数间切换的次数以及什么时候再切换到原来的函数都由开发者自己确定

    4.协程原理: 通过应用层记录程序的上下文栈区,实现程序运行中的跳跃,进而选择代码段执行

2.使用生成器函数实现协程

import time


def work1():
    while True:
        print("----work1---")
        yield
        time.sleep(0.5)


def work2():
    while True:
        print("----work2---")
        yield
        time.sleep(0.5)


def main():
    w1 = work1()
    w2 = work2()
    while True:
        next(w1)
        next(w2)


if __name__ == "__main__":
    main()
"""执行结果
    ----work1---
    ----work2---
    ----work1---
    ----work2---
    ----work1---
    ----work2---
    ...
"""

3.使用greenlet模块实现协程

# greenlet模块是对yield的封装
# greenlet 只是可以实现一个简单的切换功能,还是不能做到遇到IO就切换
# g1 = greenlet(func)  # 实例化一个对象
# g1.switch()  # 用这种方式去调用func函数
# 当使用switch调用func的时候,什么时候func会停止运行?
#    1 要么return
#    2 要么在func内部又遇到 switch
from greenlet import greenlet
import time


def test1():
    while True:
        print("---A--")
        gr2.switch()  # 切换但gr2协程函数
        time.sleep(0.5)


def test2():
    while True:
        print("---B--")
        gr1.switch()  # 切换到gr1协程函数
        time.sleep(0.5)


# 生成协程对象
gr1 = greenlet(test1)
gr2 = greenlet(test2)

# 切换到gr1中运行
gr1.switch()
"""执行结果
    ---A--
    ---B--
    ---A--
    ---B--
    ---A--
    ---B--
    ...
"""

4.使用gevent模块实现协程

    gevent实现协程原理
        gevent模块是对greenlet模块的封装,原理是当一个greenlet遇到IO(指的是input output 输入输出,比如网络,文件操作等)操作时
        例如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行

    示例

# gevent 可以实现当函数中遇到io操作时,就自动的切换到另一个函数
# g1 = gevent.spawn(func, 参数)
# gevent.join(g1)  # 让func执行完毕
# gevent.joinall([g1, g2, g3, g4])
# func停止的原因: 1.func执行完了;2.遇到IO操作了
import gevent


def f(n):
    for i in range(n):
        print(gevent.getcurrent(), i)  # gevent.getcurrent()获取当前协程对象
        # 用来模拟一个耗时操作,此处不能是time模块中的sleep
        gevent.sleep(1)


# gevent.spawn(func, argv) 将func函数变成协程时间并启动,返回一个协程对象
# 参数1: func 事件函数
# 参数2: argv 多项,为func的参数
g1 = gevent.spawn(f, 3)
g2 = gevent.spawn(f, 2)
g3 = gevent.spawn(f, 3)

# gevent.join(g1)  # 等待回收协程,参数为要回收的协程对象
# gevent.joinall([g2, g3])  # 回收多个协程,参数为列表

g1.join()  # 等待g1指向的任务执行结束
g2.join()  # 通过协程对象的方法也可以回收协程
g3.join()
"""执行结果
    <Greenlet at 0x10b63aef0: f(3)> 0
    <Greenlet at 0x10b72b050: f(2)> 0
    <Greenlet at 0x10b72b170: f(3)> 0
    <Greenlet at 0x10b63aef0: f(3)> 1
    <Greenlet at 0x10b72b050: f(2)> 1
    <Greenlet at 0x10b72b170: f(3)> 1
    <Greenlet at 0x10b63aef0: f(3)> 2
    <Greenlet at 0x10b72b170: f(3)> 2
"""

5.协程的最终实现方法: gevent + monkey

# 在实际开发中因为不确定那些地方的延时要替换成gevent中的延时,所以使用monkey给程序打补丁
from gevent import monkey
import gevent
import random
import time

# 有耗时操作时需要
monkey.patch_all()  # 将程序中用到的耗时操作的代码,换为gevent中自己实现的模块


def coroutine_work(coroutine_name):
    for i in range(10):
        print(coroutine_name, i)  # gevent.getcurrent()获取当前协程对象
        time.sleep(random.random())


gevent.joinall([
    gevent.spawn(coroutine_work, "work1"),
    gevent.spawn(coroutine_work, "work2")
])

6.串行和协成并发效率对比

from gevent import monkey
monkey.patch_all()
import gevent
import time


def func1(num):
    time.sleep(1)
    print(num)


# 串行执行
start = time.time()
for i in range(10):
    func1(i)
print(time.time() - start)  # 10.00

# 协成并发执行
start = time.time()
lst = []
for i in range(10):
    g = gevent.spawn(func1, i)
    lst.append(g)
gevent.joinall(lst)
print(time.time() - start)  # 1.00

7.协程实现并发下载器

from gevent import monkey
import gevent
import urllib.request

# 有耗时操作时需要
monkey.patch_all()


def my_downLoad(url):
    print('GET: %s' % url)
    resp = urllib.request.urlopen(url)
    data = resp.read()

    with open(file_name, "wb") as f:
        f.write(data)

    print('%d bytes received from %s.' % (len(data), url))


gevent.joinall([
    gevent.spawn(my_downLoad, 'http://www.baidu.com/'),
    gevent.spawn(my_downLoad, 'https://www.cnblogs.com/'),
    gevent.spawn(my_downLoad, 'https://www.cnblogs.com/tangxuecheng/')
])

8.协程并发-新浪图集爬虫

import urllib.request
import gevent
from gevent import monkey
import re
import os

monkey.patch_all()
IMG_URL = "http://slide.games.sina.com.cn/"


def themes(img_utl):
    """爬取主题图集分类"""
    req = urllib.request.urlopen(img_utl)
    req = str(req.read())

    # 取主题正则
    theme_re = r"<li> <a href="(.*?)" target="_blank" class="game_hover">"
    com = re.compile(theme_re)
    req_iterator = com.finditer(req)

    # 清洗数据-->迭代器的前两个成员无效url,拿到前两个值后直接丢弃
    next(req_iterator)
    next(req_iterator)

    return req_iterator


def pages(img_url, req_iterator):
    """爬取页面图集分类"""
    j = 1
    for i_url in req_iterator:
        i_url = i_url.group(1)
        # print(i_url)
        pages_req = urllib.request.urlopen(i_url)
        pages_req = str(pages_req.read())

        # 取页码正则
        page_num_re = r"<!--  -->.*?href=".*?page=(d+).*?".*?<!--  -->"
        com = re.compile(page_num_re)
        page_num_iterator = com.finditer(pages_req)
        page_num = int(next(page_num_iterator).group(1))

        # 取页面url
        pages_re = r"</a><a  style=.*?href="(.*?)" title=".*?">.*?<!--  -->"
        com = re.compile(pages_re)
        pages_iterator = com.finditer(pages_req)
        pages_url = img_url + next(pages_iterator).group(1)

        for i in range(1, page_num + 1):
            new_pages_url = pages_url + "&page=" + str(i) + "&dpc=1"
            pictures(new_pages_url, j, i)
        j += 1


def pictures(new_pages_url, j, i):
    """爬取分类中每页的图片"""
    req = urllib.request.urlopen(new_pages_url)
    req_str = str(req.read())
    img_re = r"img src="(.*?)" class="
    com = re.compile(img_re)
    req_str = com.findall(req_str)
    k = 1
    for req_msg in req_str:
        i_msg = os.getcwd() + "/新浪图集/" + "网站板块" + str(j) + "/" + "网站第" + str(i) + "" + "/" + str(k) + ".jpg"
        if not os.path.exists(os.getcwd() + "/新浪图集/" + "网站板块" + str(j) + "/" + "网站第" + str(i) + "" + "/"):
            os.makedirs(os.getcwd() + "/新浪图集/" + "网站板块" + str(j) + "/" + "网站第" + str(i) + "" + "/")

        # 启动协程-当出现延时时自动切换协程函数
        gevent.joinall([
            gevent.spawn(downloader, i_msg, req_msg)
        ])
        k += 1


def downloader(i_msg, req_msg):
    """协程实现-并发下载"""
    req = urllib.request.urlopen(req_msg)

    img_content = req.read()

    with open(i_msg, "wb") as f:
        f.write(img_content)


def main():
    req_iterator = themes(IMG_URL)
    pages(IMG_URL, req_iterator)


if __name__ == "__main__":
    main()

9.关于猴子补丁的使用和深坑

    1.猴子补丁使用
        from gevent import monkey

        # 有耗时操作时需要,例如在导入socket模块前使用可以将socket模块的IO设置为非阻塞
        monkey.patch_all()

    2.猴子补丁的深坑
        1.导入包的位置,补丁以下都会被改成阻塞
        2.patch_all的参数默认值
            patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=True, ssl=True,
                subprocess=True, sys=False, aggressive=True, Event=True,
                builtins=True, signal=True,
                queue=True, contextvars=True,
                **kwargs)
        3.开启多线程时,monkey会阻塞主线程执
        4.决绝方法: monkey.patch_all(thread=False)

10.协程通讯服务器

import gevent
from gevent import monkey

# 在导入socket前执行,改变socket的阻塞状态
monkey.patch_all()
from socket import *
from time import ctime


def server(port):
    # 创建tcp流式套接字
    tcp_server_socket = socket()
    # 设置端口重用
    tcp_server_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    # 绑定地址
    tcp_server_socket.bind(("0.0.0.0", port))
    # 开启监听,创建监听队列
    tcp_server_socket.listen(128)

    # 循环为客户端服务
    print("服务器正常运行,等待客户端连接")
    while True:
        client_socket, client_addr = tcp_server_socket.accept()
        print("客户端%s已连接服务器" % str(client_addr))
        gevent.spawn(handler, client_socket, client_addr)


# 处理客户端事件
def handler(client_socket, client_addr):
    while True:
        data = client_socket.recv(1024)
        if not data:
            break
        print("%s: %s" % (str(client_addr), data.decode()))
        client_socket.send(ctime().encode())
    client_socket.close()


def main():
    # 运行服务器
    server(7890)


if __name__ == "__main__":
    main()

11.协程通讯客户端

import socket


def main():
    # 创建数据流套接字
    tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 连接服务器
    server_ip = input("请输入要连接的服务器的ip:")
    serve_port = int(input("请输入要连接的服务器的port:"))
    server_addr = (server_ip, serve_port)
    tcp_client_socket.connect(server_addr)

    while True:
        # 发送数据
        send_data = input("请输入要发生的数据(quit退出):")
        if send_data.lower() == "quit":
            break
        tcp_client_socket.send(send_data.encode("utf-8"))
        # 接收服务器发送过来的数据
        recv_data = tcp_client_socket.recv(1024)
        print("接收到的数据为:%s" % recv_data.decode("utf-8"))
    # 关闭套接字
    tcp_client_socket.close()


if __name__ == "__main__":
    main()
原文地址:https://www.cnblogs.com/tangxuecheng/p/13634406.html