41、进程池与线程池

一、死锁与递归锁

1.1、死锁

  死锁:锁的使用,包括抢锁以及释放锁,当两个人分别各自抢到一把锁,有需要对方的锁时就会造成死锁,即程序阻塞

  Thread:操作线程模块

  Lock:互斥锁模块

  用户1抢到了A锁,接着抢到了B锁,释放了b锁,又释放了a锁,在抢了B锁,睡眠2s,

  用户2抢到了a锁,接着要去抢B锁,但是B锁在1手上,而1需要A锁,就会造成两个人都拿不到锁。

1.2、递归锁

  需要将两个锁指向同一个目标,即同一把锁,就不会造成死锁现象

  内部有一个计数器,可以连续的加锁和释放锁:每次acquire就会加一,每次release就会减一,

  只要计数器不为零,其他人就抢不到锁

from threading import Thread, Lock
import time


mutexA = Lock()
mutexB = Lock()
#mutexA = mutexB = RLock()   #递归锁,将两把锁指向同一把锁
# 类只要加括号多次 产生的肯定是不同的对象
# 如果你想要实现多次加括号等到的是相同的对象 单例模式


class MyThead(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print('%s 抢到A锁'% self.name)  # 获取当前线程名
        mutexB.acquire()
        print('%s 抢到B锁'% self.name)
        mutexB.release()
        mutexA.release()
        
    def func2(self):
        mutexB.acquire()
        print('%s 抢到B锁'% self.name)
        time.sleep(2)
        mutexA.acquire()
        print('%s 抢到A锁'% self.name)  # 获取当前线程名
        mutexA.release()
        mutexB.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThead()
        t.start()

二、信号量:Semaphore模块

  信号量在并发编程中指的是锁,即可以多个对象同时进行加锁

  使用sm = Semaphore(n),填写的数量为可以有几个对象加锁

from threading import Thread, Semaphore
import time
import random


"""
利用random模块实现打印随机验证码(搜狗的一道笔试题)
"""
sm = Semaphore(5)  # 括号内写数字 写几就表示开设几个坑位


def task(name):
    sm.acquire()
    print('%s 正在蹲坑'% name)
    time.sleep(random.randint(1, 5))
    sm.release()


if __name__ == '__main__':
    for i in range(20):
        t = Thread(target=task, args=('伞兵%s号'%i, ))
        t.start()

三、Event事件

  Event模块:一些线程需要等待另一个线程执行完了之后才能执行

  event = Event()

  event.set()通知event.wait()可以执行了

from threading import Thread, Event
import time


event = Event()  # 造了一个红绿灯


def light():
    print('红灯亮着的')
    time.sleep(3)
    print('绿灯亮了')
    # 告诉等待红灯的人可以走了
    event.set()


def car(name):
    print('%s 车正在灯红灯'%name)
    event.wait()  # 等待别人给你发信号
    print('%s 车加油门飙车走了'%name)


if __name__ == '__main__':
    t = Thread(target=light)
    t.start()

    for i in range(20):
        t = Thread(target=car, args=('%s'%i, ))
        t.start()

四、线程q

4.1、进程下为什么使用队列

  线程的资源是共享的,队列 = 管道 + 锁

  所以队列使用来保护数据的安全

4.2、队列q,先进先出  Queue()

 q = queue.Queue(3)
 q.put(1)     加入
 q.get()    输出
 q.get_nowait()   
 q.get(timeout=3)   等待3秒后执行
 q.full()     判断队列中是否饱和
 q.empty()    判断队列中是否为空

4.3、堆栈q,先进后出    LifoQueue()

# q = queue.LifoQueue(3)  # last in first out
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get())  # 3

4.4、优先级q    PriorityQueue(n)

  n代表可以入队的数量,入队时需要填元组,元组内第一个数字为优先级,第二个位置为内容,优先级的数字越低,级别越高

q = queue.PriorityQueue(4)
q.put((10, '111'))
q.put((100, '222'))
q.put((0, '333'))
q.put((-5, '444'))
print(q.get())  # (-5, '444')
# put括号内放一个元祖  第一个放数字表示优先级
# 需要注意的是 数字越小优先级越高!!!

五、进程池与线程池

5.1、池的作用

  池就是在计算机保证计算机安全的前提下,最大程度的使用计算机

  由于软件的发展速度远快于硬件,因此需要在通过限制程序的运行效率来保证计算机硬件的安全,使程序能够正常运行

5.2、进程池 ProcesspoolExecutor(进程池执行者)

  需要从concurrent里面导出ProcesspoolExecotor模块,设置进程池 pool = processpoolexecotor(),不传参则默认当前计算机CPU数,在异步提交的情况下,需要有个反馈机制来提醒有返回值了,需要输入res= pool.submit(task.i),add_done_callback(callback),接着继续执行程序

5.3、线程池 ThreadpoolExecutor(线程池执行者)

  需要从concurrent里面导出ThreadpoolExecotor模块,设置线程池 pool = ThreadpoolExecotor(),不传参则默认当前计算机CPU个数*5,在异步提交的情况下,需要有个反馈机制来提醒有返回值了,需要输入res = pool.submit(task,i).add_done_callback(callback)

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os


# pool = ThreadPoolExecutor(5)  # 池子里面固定只有五个线程
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数五倍的线程
pool = ProcessPoolExecutor(5)
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
"""
池子造出来之后 里面会固定存在五个线程
这个五个线程不会出现重复创建和销毁的过程
池子造出来之后 里面会固定的几个进程
这个几个进程不会出现重复创建和销毁的过程

池子的使用非常的简单
你只需要将需要做的任务往池子中提交即可 自动会有人来服务你
"""


def task(n):
    print(n,os.getpid())
    time.sleep(2)
    return n**n

def call_back(n):
    print('call_back>>>:',n.result())
"""
任务的提交方式
    同步:提交任务之后原地等待任务的返回结果 期间不做任何事
    异步:提交任务之后不等待任务的返回结果 执行继续往下执行
        返回结果如何获取???
        异步提交任务的返回结果 应该通过回调机制来获取
        回调机制
            就相当于给每个异步任务绑定了一个定时炸弹
            一旦该任务有结果立刻触发爆炸
"""
if __name__ == '__main__':
    # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
    # print('主')
    t_list = []
    for i in range(20):  # 朝池子中提交20个任务
        # res = pool.submit(task, i)  # <Future at 0x100f97b38 state=running>
        res = pool.submit(task, i).add_done_callback(call_back)
        # print(res.result())  # result方法   同步提交
        # t_list.append(res)
    # 等待线程池中所有的任务执行完毕之后再继续往下执行
    # pool.shutdown()  # 关闭线程池  等待线程池中所有的任务运行完毕
    # for t in t_list:
    #     print('>>>:',t.result())  # 肯定是有序的
"""
程序有并发变成了串行
任务的为什么打印的是None
res.result() 拿到的就是异步提交的任务的返回结果

六、协程

  协程并不存在与计算机内,而是程序员总结出来的。

  通过编写程序,要求IO操作在代码级别就进行切换,是CPU认为一直在执行程序,从而提高效率

6.1、协程的使用

  多道技术:保存和切换

    保存:保存上一步执行的状态,下一次接着执行之前的状态,使用yield

    切换:程序遇到IO以及程序长时间占用CPU

      切换不一定会提高效率,也有可能会降低效率:在没有等待时间的情况下,串行执行密集型的任务,频繁的切换就会降低效率,切换也是需要时间的

  TCP服务端:accept和recv

 import time

# 串行执行计算密集型的任务   1.2372429370880127
 def func1():
     for i in range(10000000):
         i + 1

 def func2():
     for i in range(10000000):
         i + 1

 start_time = time.time()
 func1()
 func2()
 print(time.time() - start_time)

 #切换 + yield  2.1247239112854004
 import time


 def func1():
     while True:
         10000000 + 1
        yield


 def func2():
     g = func1()  # 先初始化出生成器
     for i in range(10000000):
         i + 1
         next(g)

 start_time = time.time()
 func2()
 print(time.time() - start_time)

七、gevent模块

  gevent模块可以通过导入一句话,从而可以在程序中具备检测IO操作的能力,from gevent import monkey;monkey.path_all()

  通过g1 = pawn(ha)执行函数,g1.join()加入gevent模块中。

from gevent import monkey;monkey.patch_all()
"""


def heng():
    print('哼')
    time.sleep(2)
    print('哼')


def ha():
    print('哈')
    time.sleep(3)
    print('哈')

def heiheihei():
    print('heiheihei')
    time.sleep(5)
    print('heiheihei')


start_time = time.time()
g1 = spawn(heng)
g2 = spawn(ha)
g3 = spawn(heiheihei)
g1.join()
g2.join()  # 等待被检测的任务执行完毕 再往后继续执行
g3.join()
# heng()
# ha()
# print(time.time() - start_time)  # 5.005702018737793
print(time.time() - start_time)  # 3.004199981689453 

八、协程实现tcp服务端的并发

from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn


def communication(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()


def server(ip, port):
    server = socket.socket()
    server.bind((ip, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        spawn(communication, conn)


if __name__ == '__main__':
    g1 = spawn(server, '127.0.0.1', 8080)
    g1.join()

    
# 客户端
from threading import Thread, current_thread
import socket


def x_client():
    client = socket.socket()
    client.connect(('127.0.0.1',8080))
    n = 0
    while True:
        msg = '%s say hello %s'%(current_thread().name,n)
        n += 1
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))


if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=x_client)
        t.start()

九、总结

  理想状态:

    多进程下面使用多线程

    多线程下面使用协程

    从而提高程序的执行效率

  

  

原文地址:https://www.cnblogs.com/jingpeng/p/12789137.html