异步回调,事件,线程池与协程

在发起一个异步任务时,指定一个函数任务完成后调用函数

为什么需要异步

在使用线程池或进程池提交任务时想要任务的结果然后将结果处理,调用shudown 或者result会阻塞

影响效率,这样的话采用异步调用

比如result本来是用水壶烧水烧开了拿走,烧下一个

用shutdown可以将水壶一起烧但是一个一个拿走

call_done_back是一起烧,每个好了会叫你拿走做其他事

1.使用进程池时,回调函数都是主进程中执行执行

2. 使用线程池时,回调函数的执行线程是不确定的,哪个线程空闲就交给哪个线程

3. 回调函数默认接收一个参数就是这个任务对象自己,再通过对象的result函数来获取任务的处理结果

def get_data(url):
print("%s 正在请求%s" % (current_thread().name, url))
response = requests.get(url)
print("%s 请求%s成功" % (current_thread().name, url))
return response


def parser(obj):
res = obj.result()
htm = res.content.decode("utf-8")
ls = re.findall("href=.*?com", htm)
print("%s解析完成! 共%s个连接" % (current_thread().name,len(ls)))

if __name__ == '__main__':
urls = ["https://www.baidu.com",
"https://www.tmall.com",
"https://www.taobao.com",
"https://www.jd.com",
"https://www.python.org",
"https://www.apple.com"]
pool = ThreadPoolExecutor(3)

for i in urls:
obj = pool.submit(get_data, i)
# res = obj.result() # 会把任务变成串行
# parser(res)
obj.add_done_callback(parser)
线程池
queue 先进先出
lifoqueue 后进先出

PriorityQueue优先级设置

put((1,res))

事件


 线程事件Event

### 什么是事件

事件表示在某个时间发生了某个事情的通知信号,用于线程间协同工作。

因为不同线程之间是独立运行的状态不可预测,所以一个线程与另一个线程间的数据是不同步的,当一个线程需要利用另一个线程的状态来确定自己的下一步操作时,就必须保持线程间数据的同步,Event就可以实现线程间同步

### Event介绍

Event象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

可用方法:

```python

event.isSet():返回event的状态值;

event.wait():将阻塞线程;知道event的状态为True

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。




`python

from threading import Event,Thread

import time

e = Event()

def start():

    global boot

    print("正正在启动服务器.....")

    time.sleep(3)

    print("服务器启动完成!")

    e.set()

def connect():

    e.wait()

    print("链接成功")

    

Thread(target=start).start()

Thread(target=connect).start()

Thread(target=connect).start()

```

增加需求,每次尝试链接等待1秒,尝试次数为3次

```python

from threading import Event,Thread

import time

e = Event()

def start():

    global boot

    print("正正在启动服务器.....")

    time.sleep(5)

    print("服务器启动完成!")

    e.set()

def connect():

    for i in range(1,4):

        print("第%s次尝试链接" % i)

        e.wait(1)

        if e.isSet():

            print("链接成功")

            break

        else:

            print("第%s次链接失败" % i)

    else:

        print("服务器未启动!")

Thread(target=start).start()

Thread(target=connect).start()

# Thread(target=connect).start()

```



上一节中我们知道GIL锁将导致CPython无法利用多核CPU的优势,只能使用单核并发的执行。很明显效率不高,那有什么办法能够提高效率呢?

效率要高只有一个方法就是让这个当前线程尽可能多的占用CPU时间,如何做到?

任务类型可以分为两种 IO密集型 和  计算密集型

对于计算密集型任务而言 ,无需任何操作就能一直占用CPU直到超时为止,没有任何办法能够提高计算密集任务的效率,除非把GIL锁拿掉,让多核CPU并行执行。

对于IO密集型任务任务,一旦线程遇到了IO操作CPU就会立马切换到其他线程,而至于切换到哪个线程,应用程序是无法控制的,这样就导致了效率降低。

如何能提升效率呢?想一想如果可以监测到线程的IO操作时,应用程序自发的切换到其他的计算任务,是不是就可以留住CPU?的确如此

# 一、单线程实现并发

单线程实现并发这句话乍一听好像在瞎说

首先需要明确并发的定义

并发:指的是多个任务同时发生,看起来好像是同时都在进行

并行:指的是多个任务真正的同时进行

早期的计算机只有一个CPU,既然CPU可以切换线程来实现并发,那么为何不能再线程中切换任务来并发呢?

上面的引子中提到,如果一个线程能够检测IO操作并且将其设置为非阻塞,并自动切换到其他任务就可以提高CPU的利用率,指的就是在单线程下实现并发。

### 如何能够实现并发呢

并发 = 切换任务+保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,那就可以实现单线程并发

python中的生成器就具备这样一个特点,每次调用next都会回到生成器函数中执行代码,这意味着任务之间可以切换,并且是基于上一次运行的结果,这意味着生成器会自动保存执行状态!

于是乎我们可以利用生成器来实现并发执行:

```python

def task1():

    while True:

        yield

        print("task1 run")

def task2():

    g = task1()

    while True:

        next(g)

        print("task2 run")

task2()

```

并发虽然实现了,单这对效率的影响是好是坏呢?来测试一下

```python

# 两个计算任务一个采用生成器切换并发执行  一个直接串行调用

import  time

def task1():

    a = 0

    for i in range(10000000):

        a += i

        yield

def task2():

    g = task1()

    b = 0

    for i in range(10000000):

        b += 1

        next(g)

s = time.time()

task2()

print("并发执行时间",time.time()-s)

# 单线程下串行执行两个计算任务 效率反而比并发高 因为并发需要切换和保存

def task1():

    a = 0

    for i in range(10000000):

        a += i

def task2():

    b = 0

    for i in range(10000000):

        b += 1

s = time.time()

task1()

task2()

print("串行执行时间",time.time()-s)

```

可以看到对于纯计算任务而言,单线程并发反而使执行效率下降了一半左右,所以这样的方案对于纯计算任务而言是没有必要的

我们暂且不考虑这样的并发对程序的好处是什么,在上述代码中,使用yield来切换是的代码结构非常混乱,如果十个任务需要切换呢,不敢想象!因此就有人专门对yield进行了封装,这便有了greenlet模块

### greenlet模块实现并发

```python

def task1(name):

    print("%s task1 run1" % name)

    g2.switch(name) # 切换至任务2

    print("task1 run2")

    g2.switch() # 切换至任务2

def task2(name):

    print("%s task2 run1" % name)

    g1.switch() # 切换至任务1

    print("task2 run2")

g1 = greenlet.greenlet(task1)

g2 = greenlet.greenlet(task2)

g1.switch("jerry") # 为任务传参数

```

该模块简化了yield复杂的代码结构,实现了单线程下多任务并发,但是无论直接使用yield还是greenlet都不能检测IO操作,遇到IO时同样进入阻塞状态,所以此时的并发是没有任何意义的。

现在我们需要一种方案 即可检测IO 又能够实现单线程并发,于是gevent闪亮登场

# 二、协程概述

协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

需要强调的是:

```python

#1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)

#2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)

```

对比操作系统控制线程的切换,用户在单线程内控制协程的切换

优点如下:

```python

#1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

#2. 单线程内就可以实现并发的效果,最大限度地利用cpu

```

缺点如下:

```python

#1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程来尽可能提高效率

#2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

```

### gevent协程的使用

```python

import gevent,sys

from gevent import monkey # 导入monkey补丁

monkey.patch_all() # 打补丁

import time

print(sys.path)

def task1():

    print("task1 run")

    # gevent.sleep(3)

    time.sleep(3)

    print("task1 over")

def task2():

    print("task2 run")

    # gevent.sleep(1)

    time.sleep(1)

    print("task2 over")

g1 = gevent.spawn(task1)

g2 = gevent.spawn(task2)

gevent.joinall([g1,g2])

```

需要注意:

1.协程执行时要想使任务执行则必须对协程对象调用join函数

2.有多个任务时,随便调用哪一个的join都会并发的执行所有任务,但是需要注意如果一个存在io的任务没有被join该任务将无法正常执行完毕

3.monkey补丁的原理是把原始的阻塞模块替换为修改后的非阻塞模块,即偷梁换柱,来实现IO自定切换,所以打补丁的位置一定要放到导入阻塞模块之前





原文地址:https://www.cnblogs.com/wrqysrt/p/10222120.html