进程互斥锁, 进程队列, 生产者消费者模式, 线程

进程互斥锁

模拟抢票软件---并发查票与串行购票

需求:

  1. 查看余票
  2. 开始购票

异步请求购票,造成数据紊乱

import multiprocessing
import time
import json


# 查询余票数据
def query(username):
    with open('余票数据.txt', 'r', encoding='utf-8') as fr:
        data = json.load(fr)
        amount = data.get('amount')
        print(f'用户{username}查看到余票还剩{amount}张')
        return amount


def buy(username):
    # 查询余票数据
    amount = query(username)
    # 判断是否有票
    if not amount > 0:
        print('余票不足!')
        return
    else:
        # 发送购买指令时模拟网络延迟
        time.sleep(1)  # 睡眠时间太短会使无法模拟异步并发效果

        amount -= 1
        with open('余票数据.txt', 'w', encoding='utf-8') as fw:
            dic = {'amount': amount}
            json.dump(dic, fw)
            print(f'{username}用户抢票成功余票还剩{amount}张')
            return


for i in range(10):
    p = multiprocessing.Process(target=buy, args=(i,))
    if __name__ == '__main__':
        p.start()

# 还剩九张票,10个用户在同一段时间内每个用户都完成了同一个数据的操作

进程互斥锁---进程间通信方式二:通过存取第三方文件数据

  • 把锁住的代码变成串行

    import multiprocessing
    import time
    import json
    
    
    # 查询余票数据
    def query(username):
        # 查票模拟网络延迟
        time.sleep(1)  # 睡眠时间太短会无法模拟异步并发效果
    
        with open('余票数据.txt', 'r', encoding='utf-8') as fr:
            data = json.load(fr)
            amount = data.get('amount')
            print(f'用户{username}查看到余票还剩{amount}张')
    
    
    def change(username):
        # 查票模拟网络延迟
        time.sleep(1)  # 睡眠时间太短会无法模拟异步并发效果
    
        with open('余票数据.txt', 'r', encoding='utf-8') as fr:
            data = json.load(fr)
            amount = data.get('amount')
    
        # 判断是否有票
        if not amount > 0:
            print('余票不足!')
            return
    
        # 发送购买指令时模拟网络延迟
        time.sleep(1)  # 睡眠时间太短会无法模拟异步并发效果
    
        amount -= 1
        with open('余票数据.txt', 'w', encoding='utf-8') as fw:
            dic = {'amount': amount}
            json.dump(dic, fw)
            print(f'{username}用户抢票成功余票还剩{amount}张')
            return
    
    
    def buy(username, mutex):
        # 并发,异步执行查票任务
        query(username)
    
        # 加互斥锁
        mutex.acquire()
    
        # 串行,同步执行抢票任务
        change(username)
    
        # 解互斥锁
        mutex.release()
    
    
    # 实例化得到多进程锁对象
    mutex = multiprocessing.Lock()  # 在主进程实例化锁对象,将其传入子进程执行的目标中,使每个被执行的目标拿到的是同一把锁
    
    # 实例化出多个抢票进程
    for i in range(10):
        p = multiprocessing.Process(target=buy, args=(i, mutex))
        if __name__ == '__main__':
            p.start()  # 运行10个进程
    
    

线程互斥锁--->with mutex: ---自动管理锁的添加释放

import threading
import time

x = 20


def task(i):
    print(f'线程{i}启动')
    global x

    mutex.acquire()  # 加线程锁
    temp = x
    time.sleep(0.1)  # 阻塞0.1秒使得10个线程同时拿到x=20
    x = temp - 1
    print(x)
    mutex.release()  # 释放线程锁


if __name__ == '__main__':
    mutex = threading.Lock()  # 线程锁对象

    t_lt = []
    for i in range(10):
        t = threading.Thread(target=task, args=(i,))
        t.start()
        t_lt.append(t)

    for t in t_lt:
        t.join()

    print('last', x)  # 不加线程锁19,加线程锁10

进程锁与join的区别

进程锁是把子进程执行的目标中锁住的代码变成串行, start + join构造的串行是将子进程整体变成了串行

进程队列

是什么?

  • 内存中产生的一个队列空间

作用

  • 存取数据,遵循先进先出原则

性质属性

  • 数据取一个少一个
    • 管道: pipe 基于共享的内存空间实现
    • 队列: pipe + 锁(数据需一个一个取)
  • 添加数据 q.put(1) 若队列满了会原地等待
  • 不等待添加数据 q.put_nowait(2) 若队列满了,会不等待直接报错
  • 获取数据 q.get() 若队列中没有数据会原地等待
  • 不等待获取数据 q.get_nowait() 若队列中没有数据,会不等待直接报错
  • 判断队列是否为空 q.empty()
  • 不等待获取数据 q.get_nowait() 若队列中无数据,会直接报错
  • 判断队列是否为空 q.empty()
  • 判断队列是否满了 q.full()
  • q.get/put(block = True, timeout = n) block:是否阻塞等待, timeout 最多等待多少秒,超时则报错
import multiprocessing

q = multiprocessing.Queue(2)  # 参数控制队列数据个数,默认值为无数个

q.put(1)
print('放入第一个数')
q.put(2)
print('放完第二个数')
# q.put(3)  # 卡主
# print('向放第三个数')
# q.put_nowait(3)  # 报错:queue.Full

q.get()
print('取出第一数')
q.get()
print('取出第一数')
# q.get()  # 卡主
# print('想取第三个数')
# q.get_nowait()  # 报错:queue.Empty

print(q.empty())  # True
print(q.full())  # False

IPC进程间通信---进程间通信方式一:通过内存中的队列交互数据

  • 进程间数据是相互隔离的,但是可以通过队列(在内存中)实现进程间通信
import multiprocessing


class MyProcess1(multiprocessing.Process):
    def __init__(self, q):
        super().__init__()
        self.q = q

    def run(self):
        data = f'进程{multiprocessing.current_process().pid}的数据'
        self.q.put(data)
        print(f'进程{self.pid}将数据放入了队列')


class MyProcess2(multiprocessing.Process):
    def __init__(self, q):
        super().__init__()
        self.q = q

    def run(self):
        data = self.q.get()
        print(f'进程{self.pid}取出了{data}')


q = multiprocessing.Queue(1)
p1 = MyProcess1(q)
p2 = MyProcess2(q)

if __name__ == '__main__':
    p1.start()
    p2.start()

'''
进程13304将数据放入了队列
进程15112取出了进程13304的数据
'''

生产者与消费者模式

什么是生产者消费者模式

生产者进程和消费者进程彼此之间通过队列的阻塞性实现通信,队列就相当于一个缓冲区,用来解决生产者和消费者数据处理能力不同的问题, 使得两者都达到自己最大的效率

queue == 管道 + 锁, 管道基于共享的内存空间, 所以 queue不适合传大文件

  • 生产者消费者数量对等

    import multiprocessing
    import random
    import time
    
    
    def produce(q):
        for i in range(10):
            time.sleep(random.randint(0, 1))  # 模拟一个生产者10次不同的数据生产速度
            data = random.randint(1, 100)
            q.put(data)
            print(f'生产了数据{data}')
    
        q.put(None)  # 生产者生产完毕标识
    
    
    def consume(q):
        while True:
            time.sleep(random.randint(0, 2))  # 模拟一个消费者每次不同的数据消费速度
            data = q.get()
    
            # 判断数据是否消费完毕
            if data == None:
                print('数据消费完毕')
                return
    
            print(f'消费了数据{data}')
    
    
    q = multiprocessing.Queue()
    
    p = multiprocessing.Process(target=produce, args=(q,))
    c = multiprocessing.Process(target=consume, args=(q,))
    
    if __name__ == '__main__':
        p.start()
        c.start()
    
    # 生产者消费者人数不对等时,若不使用JoinableQueue,可以对标识None做一个计数判断
    
  • JoinableQueue ---解决多生产者和多消费者数量不等的问题

    • JoinableQueue 创建可连接队列, 并且底层创建了一个计数器对队列中的数据计数
    • q.put() 对队列中的数据计数 + 1
    • q.task_done() 对队列中的数据计数 -1
    • q.join() 当队列中的数据计数不为0时阻塞, 当队列中的数据计数为0时通过
    import multiprocessing
    import random
    import time
    
    
    def produce(q, number):
        for i in range(5):
            time.sleep(random.randint(0, 1))  # 模拟一个生产者10次不同的数据生产速度
            data = random.randint(100, 200)
            q.put(data)  # 队列中的数据计数 +1
            print(f'生产者{number}生产了数据{data}')
    
    
    def consume(q, number):
        while True:
            time.sleep(random.randint(0, 2))  # 模拟一个消费者每次不同的数据消费速度
            data = q.get()
            q.task_done()  # 队列中的数据计数 -1
    
            print(f'消费者{number}消费了数据{data}')
    
    
    if __name__ == '__main__':
        q = multiprocessing.JoinableQueue()
    
        lt = []
        for i in range(1, 4):
            p = multiprocessing.Process(target=produce, args=(q, i))
            p.start()
            lt.append(p)
    
        for i in range(1, 3):
            c = multiprocessing.Process(target=consume, args=(q, i))
            c.daemon = True  # 将消费者进程定义为守护进程,一旦主进程结束,即最后一行代码判断队列中数据消费完毕则关闭消费者进程
            c.start()
    
        for p in lt:
            p.join()  # 主进程原地等待所有生产者生产完数据
    
        q.join()  # 主进程判断队列中数据是否全部消费完毕
    
    

线程

右键运行后的过程

进程

  1. 申请内存空间
  2. 把解释器放入内存空间运行
  3. 把代码放入内存空间
  4. 解释器读入代码并解释

线程

  1. 运行经解释器解释后的代码

类比:

操作系统:工厂 进程:车间 线程:流水线 cpu:电源

进程与线程的区别---线程单指代码执行的过程

  • 进程是资源单位,用来把资源集中到一起, 进程运行 = 各种资源 + 线程, 线程才是CPU的真正的也是最小的执行单位, 线程运行 = 运行代码

  • 创建一个进程时 (速度慢) 需要向操作系统申请开辟内存空间并且会默认创建一个主线程, 创建一个线程只是告诉操作系统一个代码执行方案---速度快

    import multiprocessing
    import threading
    import time
    
    
    def task1():
        print('子进程开启')
        time.sleep(0.1)
        print('子进程结束')
    
    
    def task2():
        print('子线程开启')
        time.sleep(0.1)
        print('子线程结束')
    
    
    if __name__ == '__main__':
        p = multiprocessing.Process(target=task1)
        p.start()
    
        t = threading.Thread(target=task2)
        t.start()
    
        print('主线程结束')
    
    # 主线程结束,主进程并未结束需等所有子线程结束主进程才结束
    # 开启线程是立即执行,开启进程会需要一定的申请内存空间并初始化的时间从而延迟执行
    # 3,4的打印顺序取决于sleep的时间与开进程的时间之间的长短
    '''
    子线程开启  # 1
    主线程结束  # 2
    子线程结束  # 3
    子进程开启  # 4
    子进程结束  # 5
    '''
    
    
  • 进程间的内存空间是相互隔离的(即使子进程拥有父进程的数据段副本), 而线程间是共享同一个进程中的空间资源

    • 父进程发生变化不会对子进程及其他进程产生影响, 主线程发生变化可能会影响到其他线程的行为
  • 进程间对CPU的使用是竞争关系, , 而同一个进程的线程间是合作关系

  • 进程只能控制自己的子进程创建, 但线程可以控制同一进程下的同级线程

为什么要用线程

  • 创建线程的时间和空间开销远小于创建进程
  • 若多个需执行的任务存在频繁I/O处理, 采用多线程执行更有优势, 若多个需执行的任务是CPU密集型任务, 多线程处理并不能获得性能上的增强

如何创建线程

  • 方式一:通过导入threading模块中的Thread类创建

    import threading
    import time
    
    
    # 定义线程目标任务
    def task(i):
        print(f'线程{i}开启')
        for j in range(10):
            i += 1
        time.sleep(1)
        print('线程结束')
    
    
    # 使用线程模块中的线程类实例化得到一个线程对象并指定需要执行的目标任务
    t = threading.Thread(target=task, args=(0,))  # args控制目标任务的参数输入
    
    # 开启一个线程并执行目标任务
    if __name__ == '__main__':  # 开启一个线程实际是不需要防止递归申请内存空间,但是为了统一书写加上判断语句
        t.start()
    
    
  • 方式二:自定义MyThread类, 但必须要继承hreading模块中的Thread

    import threading
    import time
    
    
    # 自定义线程类
    class MyThread(threading.Thread):  # 自定义类需继承threading模块中的Thread类
        def __init__(self, i):  # 派生初始化时的额外数据属性,控制线程执行任务的参数输入
            super().__init__()  # 复用父类Thread的初始化数据属性
            self.i = i  # 额外数据属性i
    
        def run(self):
            print(f'线程{self.i}开启')
            for i in range(1000):
                self.i += 1
                time.sleep(0.01)
            print('线程结束')
    
    
    t = MyThread(1)
    if __name__ == '__main__':
        t.daemon = True  # 守护线程参数
        t.start()
    # t.join()
    time.sleep(1)
    print(t.i)  # 61/60
    
    

多进程与多线程执行不同类型的任务对比

cpython无法实现多线程并行,只能实现多线程并发

面试如果问,回答:公司是做计算密集型项目多还是做I/O密集型项目多

import threading
import time
import multiprocessing
import os


# 定义计算密集型任务
def task1():
    number = 0
    for j in range(10000000):
        number += 1


# 定义I/O密集型任务
def task2():
    time.sleep(1)


if __name__ == '__main__':
    print(os.cpu_count())  # 4核
    start = time.time()
    # 创建50个进程/线程
    lt = []
    for i in range(50):
        # p = multiprocessing.Process()  # 开启50个进程不执行任务的时间:5.307905197143555
        # p = multiprocessing.Process(target=task1)  # 50个进程执行计算密集型任务时间:11.739251852035522  时间差:6.431346654891968
        # p = multiprocessing.Process(target=task2)  # 50个进程执行I/O密集型任务时间:6.573248863220215  时间差:1.0519602298736572

        # p = threading.Thread()  # 开启50个线程不执行任务的时间:0.015621185302734375
        # p = threading.Thread(target=task1)  # 50个线程执行计算密集型任务时间:23.993163585662842  时间差:23.977542400360107
        p = threading.Thread(target=task2)  # 50个线程执行计算I/O型任务时间:1.0519602298736572  时间差:1.0363390445709229
        p.start()
        lt.append(p)

    # 开启50个进程并开始计时
    for p in lt:
        p.join()
    end = time.time()
    print(f'运行时间:{end-start}')

在计算密集型的情况下使用多进程,

在I/O密集型的情况下使用多线程,优势:消耗资源少,时间上差不多

高效执行使用 多进程 + 多线程

线程对象的方法

t.daemon = True 守护线程,守护的是整个进程的运行周期

from threading import Thread  # 多次导入同一模块中的类或方法只执行一次模块中的代码
import threading
import time


def task():
    print('开始...')
    print("*" * 100, threading.current_thread())

    time.sleep(1)

    print(threading.current_thread())  # 获取当前线程对象  # <Thread(蔡启龙, started 43020)>
    print("*" * 100, threading.current_thread())

    print(threading.enumerate())  # 获取当前活跃的线程列表
    # [<_MainThread(MainThread, stopped 59888)>, <Thread(蔡启龙, started 24176)>, <Thread(Thread-2, started 4388)>]
    print("*" * 100, threading.current_thread())

    print(f'结束...{threading.current_thread()}')
    print("*" * 100, threading.current_thread())


if __name__ == '__main__':
    t1 = Thread(target=task)
    t2 = Thread(target=task)
    t1.start()
    t2.start()

    print(t1.is_alive())  # 判断子线程是否存活  # True
    print('*' * 100)

    print(threading.enumerate())  # 获取当前活跃的线程列表
    # [<_MainThread(MainThread, started 59888)>, <Thread(Thread-1, started 24176)>, <Thread(Thread-2, started 4388)>]
    print('*' * 100)

    print(threading.active_count())  # 获取当前活跃的线程个数  # 3
    print('*' * 100)

    # t1.join()  # 等待被join的子线程结束再运行下面代码

    t1.setName('蔡启龙')  # 设置线程名
    print(t1.getName())  # 获取线程名  # 蔡启龙
    print('*' * 100)

线程互斥锁

线程之间共享同一个进程中的数据

# 线程之间共享同一个进程中的数据
x = 100


def task1():
    global x
    x = '200'


t = threading.Thread(target=task1)
t.start()
print(x)  # 200

线程互斥锁

import threading

x = 0
mutex = threading.Lock()  # 线程互斥锁牺牲执行效率保证了数据安全


def task():
    global x

    # mutex.acquire()

    for i in range(200000):
        x = x + 1
        '''
        1. 某次 t1 刚拿到 x = 1000 还没进行计算就报错状态被切换到t2
        2. 切换到 t2 后 t2 拿到 x = 1000 进行 +1 运算并赋值,使得 x = 1001
        3. 此时切回 t1 计算 1000 + 1 = 1001 并赋值, 使得 x = 1001
        4. 以供进行了两次 +1 操作,但实际结果只加了 1
       '''

    # mutex.release()


if __name__ == '__main__':  # python使用的是操作系统原生线程
    t1 = threading.Thread(target=task)
    t2 = threading.Thread(target=task)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    print(x)  # 不加线程互斥锁:328787  # 加线程互斥锁:400000  # 主线程结束后会等待子线程结束

原文地址:https://www.cnblogs.com/-406454833/p/11753269.html