Python Day 36 线程队列、线程事件Event、协程、Gevent 模块实现单线程并发

  ##内容回顾

#1、什么是GIL
全局解释器锁,本质就是一把互斥锁,是加到解释器身上的,每一个python进程内都有这么一把锁

#2、有了GIL会对单进程下的多个线程造成什么样的影响
多线程要想执行,首先需要争抢GIL,对所有待执行的线程来说,GIL就相当于执行权限,同一时刻只有一个线程争抢成功,即单进程下的多个线程同一时刻只有一个在运行
意味着单进程下的多线程没有并行的效果,但是有并发的效果

ps:分散于不同进程内的线程不会去争抢同一把GIL,只有同一个进程的多个线程才争抢同一把GIL
#3、为什么要有GIL
python解释器的内存管理机制不是线程安全的
#4、GIL与自定义互斥锁的区别,多个线程争抢GIL与自定义互斥锁的过程分析
相同:
    都是互斥锁
不同点:
    GIL是加到解释器上的,作用于全局
    自定义互斥锁作用于局部

    单进程内的所有线程都会去抢GIL
                单进程内的只有一部分线程会去抢自定义的互斥锁
        5、什么时候用python的多线程,什么时候用多进程,为什么?
            单进程下的多个线程是无法并行,无法并行意味着不能利用多核优势

            cpu干的计算的活,多个cpu意味提升了计算性能,
            cpu是无法做IO操作,多个cpu在IO操作面前毫无用处

            当我们的程序是IO密集型的情况下,多核对性能的提升微不足道,此时可以使用python的多线程
            当我们的程序是计算密集型的情况下,一定要用上多核优势,此时可以使用python的多进程
    2、进程池与线程池
        1、池的用途,为何要用它
            池:装固定数目的东西,东西指的是进程或者线程,让机器在自己可承受的范围内去保证一个高效的工作

  ##自己如何直接使用Thread的话 如何完成回调案例

"""
#回调函数:直接使用add_done_callback方法直接调用就好
在线程池/进程池 每次提交任务 都会返回一个表示任务的对象 Future对象 Future对象具备一个绑定方法 add_done_callback 用于指定回调函数 add 意味着可以添加多个回调函数
#使用线程Thread 自定义一个回调函数
""" from threading import Thread import time # res = None def call_back(res): print("任务结果拿到了:%s" % res) def parser(res): print("任务结果拿到了:%s" % res) def task(callback): # global res print("run") time.sleep(1) # # return 100 res = 100 # 表示任务结果 callback(res) # 执行回调函数 并传入任务结果 t = Thread(target=task,args=(parser,)) t.start() # t.join() # print(res) print("over")

  ##线程队列

#1.Queue 先进先出队列

与多进程中的Queue使用方式完全相同,区别仅仅是不能被多进程共享。

from queue import Queue,LifoQueue,PriorityQueue

# q = Queue()
#
# q.put("123")
# q.put("456")
#
# print(q.get())
# print(q.get())
#
# # print(q.get(block=True,timeout=3))
# q.task_done()
# q.task_done()
# q.join()
# print("over")

#2.LifoQueue 后进先出队列

该队列可以模拟堆栈,实现先进后出,后进先出

# 除顺序以外别的都一样
# lq = LifoQueue()
#
# lq.put("123")
# lq.put("456")
#
# print(lq.get())
# print(lq.get())

#3.PriorityQueue 优先级队列

该队列可以为每个元素指定一个优先级,这个优先级可以是数字,字符串或其他类型,但是必须是可以比较大小的类型,取出数据时会按照从小到大的顺序取出

pq = PriorityQueue()
# 数字优先级
pq.put((10,"a"))
pq.put((11,"a"))
pq.put((-11111,"a"))

print(pq.get())
print(pq.get())
print(pq.get())
# 字符串优先级
pq.put(("b","a"))
pq.put(("c","a"))
pq.put(("a","a"))

print(pq.get())
print(pq.get())
print(pq.get())

#优先级:对象比较
class A(object):
    def __init__(self,age):
        self.age = age

    # def __lt__(self, other):
    #     return self.age < other.age
    #
    # def __gt__(self, other):
    #     return self.age > other.age

    def __eq__(self, other):
        return self.age == other.age

a1 = A(50)
a2 = A(50)



print(a1 == a2)

# print(a1 is a1)


# pq = PriorityQueue()
# pq.put("a")
# pq.put("A")
# pq.put("C")
#
#



# print(pq.get())

   ##线程事件Event

### 1、什么是事件

事件表示在某个时间发生了某个事情的通知信号,用于线程间协同工作。

因为不同线程之间是独立运行的状态不可预测,所以一个线程与另一个线程间的数据是不同步的,当一个线程需要利用另一个线程的状态来确定自己的下一步操作时,就必须保持线程间数据的同步,Event就可以实现线程间同步

###2、 Event介绍

Event象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

###3、可用方法:
event.isSet():返回event的状态值;
event.wait():将阻塞线程;知道event的状态为True
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。

###4、使用案例
#4-1 在链接mysql服务器前必须保证mysql已经启动,而启动需要花费一些时间,所以客户端不能立即发起链接 需要等待msyql启动完成后立即发起链接
from threading import Event,Thread
import time

boot = False
def start():
    global boot
    print("正正在启动服务器.....")
    time.sleep(5)
    print("服务器启动完成!")
    boot = True
    
def connect():
    while True:
        if boot:
            print("链接成功")
            break
        else:
            print("链接失败")
        time.sleep(1)

Thread(target=start).start()
Thread(target=connect).start()
Thread(target=connect).start()


#4-2使用Event改造后:
“”“
from threading import Event,Thread
import time

e = Event()
def start():
    global boot
    print("正正在启动服务器.....")
    time.sleep(3)
    print("服务器启动完成!")
    e.set()

def connect():
    e.wait()
    print("链接成功")
    
Thread(target=start).start()
Thread(target=connect).start()
Thread(target=connect).start()

”“”
#4-3增加需求,每次尝试链接等待1秒,尝试次数为3次
“”“
from threading import Event,Thread
import time

e = Event()
def start():
    global boot
    print("正正在启动服务器.....")
    time.sleep(5)
    print("服务器启动完成!")
    e.set()

def connect():
    for i in range(1,4):
        print("第%s次尝试链接" % i)
        e.wait(1)
        if e.isSet():
            print("链接成功")
            break
        else:
            print("第%s次链接失败" % i)
    else:
        print("服务器未启动!")

Thread(target=start).start()
Thread(target=connect).start()
# Thread(target=connect).start()
”“”

  ##协程

###协程理论基础###
###1、什么是协程
协程:本质是单线程实现并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

###2、为什么使用协程,如何能够实现并发呢
在Cpython多线程无法并行执行,在IO密集任务中,并且并发量较大,无法开启更多的线程时,造成后续的任务无法处理,即使前面都在等待IO

协程就可以使用单线程实现并发,当某一个任务处于IO阻塞时,就可以切换到其他的任务来执行
并发 = 切换任务+保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,那就可以实现单线程并发 python中的生成器就具备这样一个特点,每次调用next都会回到生成器函数中执行代码,这意味着任务之间可以切换,并且是基于上一次运行的结果,这意味着生成器会自动保存执行状态! ###3、对比操作系统控制线程的切换,用户在单线程内控制协程的切换 1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行) 2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关) ###4、优缺点 #优点: 1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级 2. 单线程内就可以实现并发的效果,最大限度地利用cpu时间片,可以占用CPU直到超时 #缺点: 1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程 2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程 ###5、总结协程特点: 必须在只有一个单线程里实现并发 修改共享数据不需加锁 用户程序里自己保存多个控制流的上下文栈 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制)) ###6、yiled实现并发效果 # 使用生成器来实现 单线 并发多个任务 import time def func1(): a = 1 for i in range(10000000): a += 1 print("a run") time.sleep(10) yield def func2(): res = func1() a = 1 for i in range(10000000): a += 1 print("b run") next(res) st = time.time() func2() print(time.time() - st) ### 单线程下串行执行两个计算任务 效率反而比并发高 因为并发需要切换和保存 # def func1(): # a = 1 # for i in range(10000000): # a += 1 # # # def func2(): # a = 1 # for i in range(10000000): # a += 1 # # # st = time.time() # func1() # func2() # print(time.time() - st) ###7、greentlet模块实现并发 “”“ 使用yield来切换是的代码结构非常混乱,如果十个任务需要切换呢,不敢想象!因此就有人专门对yield进行了封装,这便有了greenlet模块该模块简化了yield复杂的代码结构,实现了单线程下多任务并发,但是无论直接使用yield还是greenlet都不能检测IO操作,遇到IO时同样进入阻塞状态,所以此时的并发是没有任何意义的。 现在我们需要一种方案 即可检测IO 又能够实现单线程并发,于是gevent闪亮登场 ”“” def task1(name): print("%s task1 run1" % name) g2.switch(name) # 切换至任务2 print("task1 run2") g2.switch() # 切换至任务2 def task2(name): print("%s task2 run1" % name) g1.switch() # 切换至任务1 print("task2 run2") g1 = greenlet.greenlet(task1) g2 = greenlet.greenlet(task2) g1.switch("jerry") # 为任务传参数 ###8、Gevent 模块实现单线程并发 “”“ #8-1 Gevent模块的介绍 Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。 #8-2Gevent模块的用法 #用法 g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的 g2=gevent.spawn(func2) g1.join() #等待g1结束 g2.join() #等待g2结束 #或者上述两步合作一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值 #8-3 Gevent模块默认不识别IO阻塞的如(time.sleep(3)),只能默认是识别自身的gevent.sleep(2),我们就引入了猴子补丁用法 import gevent def eat(name): print('%s eat 1' %name) gevent.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1=gevent.spawn(eat,'egon') g2=gevent.spawn(play,name='egon') g1.join() g2.join() #或者gevent.joinall([g1,g2]) print('') ###8-4 Gevent模块:猴子补丁,就可以识别IO阻塞,注意:猴子补丁语句只能放在文件开头,我们可以用threading.current_thread().getName()来查看每个g1和g2,查看的结果为DummyThread-n,即假线程 #总结 1.如果主线程结束了 协程任务也会立即结束。 2.monkey补丁的原理是把原始的阻塞方法替换为修改后的非阻塞方法,即偷梁换柱,来实现IO自动切换 ​ 必须在打补丁后再使用相应的功能,避免忘记,建议写在最上方 ##8-4 示例一 # gevent 不具备检测IO的能力 需要为它打补丁 打上补丁之后就能检测IO # 注意补丁一定打在最上面 必须保证导入模块前就打好补丁 from gevent import monkey;monkey.patch_all() import gevent from threading import current_thread import time def task1(): print(current_thread(),1) print("task1 run") # gevent.sleep(3) time.sleep(3) print("task1 over") def task2(): print(current_thread(),2) print("task2 run") print("task2 over") # spawn 用于创建一个协程任务 g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) # 任务要执行,必须保证主线程没挂 因为所有协程任务都是主线在执行 ,必须调用join来等待协程任务 # g1.join() # g2.join() # 理论上等待执行时间最长的任务就行 , 但是不清楚谁的时间长 可以全部join gevent.joinall([g1,g2]) print("over") ##8-4 示例二 import gevent,sys from gevent import monkey # 导入monkey补丁 monkey.patch_all() # 打补丁 import time print(sys.path) def task1(): print("task1 run") # gevent.sleep(3) time.sleep(3) print("task1 over") def task2(): print("task2 run") # gevent.sleep(1) time.sleep(1) print("task2 over") g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) #gevent.joinall([g1,g2]) g1.join() g2.join() # 执行以上代码会发现不会输出任何消息 # 这是因为协程任务都是以异步方式提交,所以主线程会继续往下执行,而一旦执行完最后一行主线程也就结束了, # 导致了协程任务没有来的及执行,所以这时候必须join来让主线程等待协程任务执行完毕 也就是让主线程保持存活 # 后续在使用协程时也需要保证主线程一直存活,如果主线程不会结束也就意味着不需要调用join ”“”
#####最终的解决方案
多进程+单线程+ 协程

面试问题:如何是实现高并发?
答:先分析是什么任务,如果是计算密集型,开多进程,那为什么开多进程,因为Cpython存在GIL锁,集群(所有服务器干的都是一件事)或者分布式(每个服务器就干一种活)

   

原文地址:https://www.cnblogs.com/liangzhenghong/p/10986635.html