并发编程之协程

1.异步回调

回调函数指的是a交给b一个任务,b在执行完成之后回过头调用了a的一个函数,称之为回调。

为什么需要回调函数?因为需要获取异步任务的结果,但是又不应该阻塞,这时回调函数就可以提高效率,高效的获取任务的结果。

什么时候使用回调函数?通常异步任务都会和回调函数一起使用。

使用方式:使用add_done_callback()函数给Future对象绑定一个回调函数

注:在多进程中回调函数是交给主进程来执行,而在多线程中,回调函数是谁有空谁执行(不可能是主线程)。

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests, re
from threading import current_thread


# 爬虫
# 1.从目标站点下载网页数据,本质就是HTML格式字符串
# 2.用re正则表达式,提取出你所需要的数据
def get_data(url):
    print('%s向%s发起请求' % (current_thread().name, url))
    res = requests.get(url)
    print('%s请求%s成功' % (current_thread().name, url))
    return res


def parser(obj):
    res = obj.result()
    htm = res.content.decode('utf-8')
    l = re.findall('href=(.*?com)', htm)
    print('%s解析成功,共%s个连接' % (current_thread().name, len(l)))


pool = ThreadPoolExecutor(2)

url = ['https://www.baidu.com',
       'https://www.jd.com',
       'https://www.python.org',
       'https://www.baidu.com',
       'https://www.cnblogs.com', ]
for i in url:
    obj = pool.submit(get_data, i)  
    obj.add_done_callback(parser)  # 回调parser函数,并会将obj传入。

# ThreadPoolExecutor-0_0向https://www.baidu.com发起请求
# ThreadPoolExecutor-0_1向https://www.jd.com发起请求
# ThreadPoolExecutor-0_1请求https://www.jd.com成功
# ThreadPoolExecutor-0_1解析成功,共148个连接
# ThreadPoolExecutor-0_1向https://www.python.org发起请求
# ThreadPoolExecutor-0_0请求https://www.baidu.com成功
# ThreadPoolExecutor-0_0解析成功,共13个连接
# ThreadPoolExecutor-0_0向https://www.baidu.com发起请求
# ThreadPoolExecutor-0_0请求https://www.baidu.com成功
# ThreadPoolExecutor-0_0解析成功,共13个连接
# ThreadPoolExecutor-0_0向https://www.cnblogs.com发起请求
# ThreadPoolExecutor-0_0请求https://www.cnblogs.com成功
# ThreadPoolExecutor-0_0解析成功,共153个连接
# ThreadPoolExecutor-0_1请求https://www.python.org成功
# ThreadPoolExecutor-0_1解析成功,共55个连接
多线程异步回调

2.线程队列

线程队列与进程队列的区别:进程队列可以被多进程共享,而线程中的队列就是一个普通的容器,不能进程共享。

线程队列三种方式:

from queue import Queue, LifoQueue, PriorityQueue
from threading import Thread

# 1.先进先出
q1 = Queue()
q1.put(1)
q1.put(2)
q1.put(3)
print(q1.get())
print(q1.get())
print(q1.get())

# 2.先进后出(堆栈)
q2 = LifoQueue()

q2.put(1)
q2.put(2)
q2.put(3)

print(q2.get())
print(q2.get())
print(q2.get())

# 优先级队列
q3 = PriorityQueue()

q3.put((2, 1))  # 第一个参数设置优先级,第二个参数传入数据
q3.put((1, 2))  # 取出顺序是由小到大,优先级可以是数字或者字符,只要能比较大小即可
q3.put((3, 3))

print(q3.get()[-1])
print(q3.get()[-1])
print(q3.get()[-1])

3.线程事件(Event)

事件:指的是用于协调多个线程工作的,当一个线程要执行某个操作,需要获取另一个线程的状态。

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。

import time
from threading import Thread
from threading import Event

# 使用变量类完成多线程协作
# is_boot = False
# def start():
#     global is_boot
#     print("正在启动服务器......")
#     time.sleep(5)
#     print("服务器启动成功!")
#     is_boot = True
#
# def connect():
#     while True:
#         if is_boot:
#             print("连接服务器成功!")
#             break
#         else:
#             print("连接服务器失败!")
#         time.sleep(0.5)
#
#
# Thread(target=start).start()
# Thread(target=connect).start()
变量类多线程协作
from threading import Thread, Event
import time

e = Event()  # 默认值为False


def start():
    print('服务器启动。。')
    time.sleep(3)
    print('服务器启动成功')
    e.set()  # 就是把事件的值设置为True


def connect():
    for i in range(3):
        print('等待服务器启动。。')
        e.wait(0.5)  # 可以设置等待时间,等待e.set()执行,也就是将事件的值设置为True
        if e.is_set():
            print('连接成功。。。')
            break
        else:
            print('连接失败。。。')
    else:
        print('放弃连接')
Thread(target=start).start()
Thread(target=connect).start()
# 服务器启动。。
# 等待服务器启动。。
# 连接失败。。。
# 等待服务器启动。。
# 连接失败。。。
# 等待服务器启动。。
# 连接失败。。。
# 放弃连接
# 服务器启动成功
Event完成多线程协作
event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False

4.单线程实现并发

并发 = 切换任务+保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,那么就可以实现单线程并发。

python中的生成器就具备这样的特点,每次调用next都会执行回到生成器函数中执行代码,者意味着任务之间可以切换,并且是基于上一次运行的结果,这意味着会自动保存执行状态

import time


def func1():
    a = 1
    while True:
        a += 1
        print(a)
        print('func1 run')
        yield


def func2():
    g = func1()  # 获取生成器
    while True:
        print('func2 run')
        time.sleep(2)
        next(g)  # 执行生成器中的代码


func2()

虽然并发实现,但是还有问题,看一下代码:

import time

# 生成器执行纯计算任务
def func1():
    a = 0
    for i in range(10000000):
        a += i
        yield
def func2():
    g = func1()  # 获取生成器
    b = 0
    for i in range(10000000):
        b += i
        next(g)  # 执行生成器中的代码
start = time.time()
func2()
print('执行时间为 :%s' % (time.time()-start))

# 执行时间为 :2.2250561714172363


# 普通调用函数执行纯计算任务
def func1():
    a = 0
    for i in range(10000000):
        a += i
def func2():
    b = 0
    for i in range(10000000):
        b += i
start = time.time()
func1()
func2()
print('执行时间为 :%s' % (time.time()-start))

# 执行时间为 :1.0411033630371094

可以看到对于纯计算任务而言,单线程并发反而使执行效率下降了一半左右,所以这样的方案对于纯计算任务而言使没有必要的,在上述代码中,使用yield来切换使得代码结构非常混乱,所以就引入了greenlet模块,专门对yield进行了封装。

greenlet模块

import greenlet
import time
def task1():
    print('task1 run')
    time.sleep(5)  # 等待5秒
    g2.switch()  # 切换到task2任务中
    print('task1 run')

def task2():
    print('task2 run')
    g1.switch()  # 切换到task1任务中

g1 = greenlet.greenlet(task1)
g2 = greenlet.greenlet(task2)

g1.switch()  # 相当于开启一个线程

# task1 run
# task2 run
# task1 run

该模块简化了yield复杂的代码结构,实现了单线程下多任务并发,但是无论直接使用yield,还是greenlet模块都不能检测I/O操作,遇到i/o时同样进程阻塞状态,所以此时的并发没有任何意义。现在我们需要一种方案既可以检测I/O又能够实现单线程并发,于是引入了gevent模块。

5.协程

协程:是单线程下的并发,就是在应用程序中控制多个任务的切换+保存状态,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

需要强调的是:

1.python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到I/O或执行时间过长,就会被迫交出CPU执行权限,切换其他线程运行)

2.单线程内开启协程,一旦遇到I/O,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(非I/O操作的切换与效率无关)

对比操作系统控制线程的切换,用户在单线程内控制协程的切换

优点:

1.协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

2.单线程内就可以实现并发的效果,最大限度利用CPU

缺点:

1.协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程。

2.协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程。

6.gevent模块

gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

在Python中使用Gevent模块来实现协程,它可以在多个任务之间进行切换,并且可以自己检测I/O。

import gevent
def task1():
    print('task1 run')
    gevent.sleep(3)
    print('task1 run')

def task2():
    print('task2 run')

g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)

# g1.join()
# g2.join()
gevent.joinall([g1, g2])  # 开启所有协程

上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,

而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了

from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前

或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头

from gevent import monkey
import gevent
import time
monkey.patch_all()  
def task1():
    print('task1 run')
    time.sleep(3)
    print('task1 run')

def task2():
    print('task2 run')

g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)

# g1.join()
# g2.join()
gevent.joinall([g1, g2])  # 开启所有协程

需要注意:

1.协程执行时要想使任务执行则必须对协程对象调用join函数

2.有多个任务时,随便调用哪一个的join都会并发的执行所有任务,但是需要注意如果一个存在io的任务没有被join该任务将无法正常执行完毕

3.monkey补丁的原理是把原始的阻塞模块替换为修改后的非阻塞模块,即偷梁换柱,来实现IO自定切换,所以打补丁的位置一定要放到导入阻塞模块之前

 

原文地址:https://www.cnblogs.com/wangke0917/p/10220813.html