1022 笔记

一 .进程互斥锁

并发变串行

让并发变成串行,牺牲了执行效率,保证了数据的安全在程序并发执行时,需要修改数据时使用.

运行时出现全部购票成功的情况(余票为1),是因为并发编程,每个子进程都获得res为1,每个都以自己的res执行get,最后成功购票,可以在p.start后面加上p.join使其变为串行,前面结束才能运行后面的
from multiprocessing import Process
import json,time

def search():
    '''查询余票'''
    time.sleep(1)
    with open('db.txt','r',encoding='utf-8') as f:
        res = json.load(f)
        print(f"还剩{res.get('count')}")

def get():
    '''购票'''
    with open('db.txt','r',encoding='utf-8') as f:
        res = json.load(f)
    time.sleep(1)  # 模拟网络io延迟
    if res['count'] > 0:
        res['count'] -= 1
        with open('db.txt', 'w', encoding='utf-8') as f:
            json.dump(res,f)
            print('抢票成功')
        time.sleep(1)
    else:
        print('车票已经售罄')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(10):
        p =Process(target=task)
        p.start()
        # 加上join运行结果正确
        p.join()  

lock进程锁/互斥锁

使第一个进程进去第二个必须等待结束才能进,把锁住的代码变成了串行.

lock.acquire()加锁, lock.release()解锁

# 设置进程串行,进程锁/互斥锁,使第一个进程进去第二个必须等待结束才能进
from multiprocessing import Process
import json,time
from multiprocessing import Lock

def search():
    time.sleep(1)
    with open('db.txt','r',encoding='utf-8') as f:
        res = json.load(f)
        print(f"还剩{res.get('count')}")

def get():
    with open('db.txt','r',encoding='utf-8') as f:
        res = json.load(f)
    time.sleep(1)  # 模拟网络io
    if res['count'] > 0:
        res['count'] -= 1
        with open('db.txt', 'w', encoding='utf-8') as f:
            json.dump(res,f)
            print('抢票成功')
        time.sleep(1)
    else:
        print('车票已经售罄')

def task(lock):
    search()
    # lock = Lock()  # 写在主进程时为了让子进程拿到同一把锁
    # 锁住
    lock.acquire()
    get()   # 同一时间只能一个进程执行get()
    # 释放锁
    lock.release()


if __name__ == '__main__':
    lock = Lock()   # 写在主进程时为了让子进程拿到同一把锁
    for i in range(10):
        p =Process(target=task,args=(lock,))  # 将lock当做参数传入
        p.start()

总结

进程锁: 是把锁住的代码变成了串行

join: 是把所有的子进程变成了串行

二 .IPO机制

实现进程间的通信:队列和管道

1.管道

pipe: 基于共享的内存空间

2.队列

Queue: pipe+锁 from multiprocessing import Queue

队列:先进先出

相当于内存中的产生一个队列空间,可以存放多个数据,但数据的顺序是由先进去的排在前面

堆栈:先进后出

1.Queue()

调用队列类,实例化对象q

q = Queue(5)   #若传参队列中可以放5个数据
q = Queue()  	#若不传参,队列中可以存放无限大的数据,前提是硬件跟得上

2.put()

添加数据,若队列中的数据满了放值会阻塞

from multiprocessing import Queue
q = Queue(2)
q.put('山')
q.put('水')
q.put([1,2,3])  # 阻塞住

========================================================

from multiprocessing import Queue
q = Queue(2)
#  block=true  是默认会阻塞, timeout = 2  等待时间超时2s
q.put('风',block=True,timeout=2)
q.put('风',block=True,timeout=2)
# 如果满了会进行等待,但最多等待2s否则报错
q.put('风',block=True,timeout=2)

3.q.get()

遵循先进先出

from multiprocessing import Queue
q = Queue(2)
q.put('山')
q.put('水')
q.put([1,2,3])  

print(q.get())  # 山
print(q.get())  # 水
print(q.get())  # [1, 2, 3]
print(q.get())  # 默认存值没有就会阻塞在这

4.empty()

判断队列是否为空

print(q.empty())  # False

5.q.get_nowait()

获取数据,队列中若没有则会报错

print(q.get_nowait())

6.q.put_nowait()

添加数据 若队列满了, 则会报错

q.put_nowait(6)

7.q.full()

判断队列是否满

print(q.full())  # True

3.进程间通信

进程间数据是相互隔离的,若想实现进程间通信,可以利用队列.

可以利用进程的队列,创建数据以及获取数据(Queue不适合传大文件)

from multiprocessing import Process,Queue

def test1(q):
    data = f'你好啊,赛利亚'
    # 将数据data传入队列中
    q.put(data)
    print('进程1开始添加数据到队列中...')

def test2(q):
    data = q.get()
    print(f'进程2从队列中获取数据[{data}]')

if __name__ == '__main__':
    # 获得队列的对象
    q = Queue()
    # 获取进程对象,并将队列对象传入
    p1 = Process(target=test1,args=(q,))
    p2 = Process(target=test2,args=(q,))

    # 启动进程1,2
    p1.start()
    p2.start()
'''
进程1开始添加数据到队列中...
进程2从队列中获取数据[你好啊,赛利亚]
'''

三 . 生产者消费者模型

生产者:生产数据的

消费者:使用数据的

生产者 --- 队列(容器) --- 消费者

通过队列,生产者把数据添加进去,消费者从队列中获取(不适合传大文件)

生产者可以不停的生产,并可以存放在容器队列中,消费者也可以不停的取,没有因生产者的生产效率低下而等待(一种设计思想)

简单终结生产版

from multiprocessing import Queue,Process

def producer(q,name,food):
    '''生产者'''
    for i in range(10):
        print(f'{name}生产出{food}数据{i}')
        # 传输的消息
        res = f'{food}{i}'
        # 放入队列中
        q.put(res)
    # 循环结束,传入q结束生产
    q.put('q')


def consumer(q,name):
    '''消费者'''
    # 消费者不停的获取
    while True:
        # 从队列中获取
        res = q.get()
        if res == 'q':
            break
        print(f'{name}消费使用了{res}数据')

if __name__ == '__main__':
    q = Queue() # 创建队列

    p1 = Process(target=producer,args=(q,'小明','电影'))  # 传入子进程中
    p2 = Process(target=consumer,args=(q,'小红'))

    p1.start()
    p2.start()

加强版

from multiprocessing import Queue,Process

def producer(q,name,food):
    '''生产者'''
    for i in range(3):
        print(f'{name}生产出{food}数据{i}')
        # 传输的消息
        res = food,i
        # 放入队列中
        q.put(res)



def consumer(q,name):
    '''消费者'''
    # 消费者不停的获取
    while True:
        # 从队列中获取
        res = q.get()
        # 判断是否有数据
        if not res:
            break
        print(f'{name}消费使用了{res}数据')

if __name__ == '__main__':
    q = Queue() # 创建队列

    p1 = Process(target=producer,args=(q,'小明','电影'))  # 传入子进程中
    p2 = Process(target=producer,args=(q,'小黑','书籍'))  # 传入子进程中
    p3 = Process(target=producer,args=(q,'小亮','动漫'))  # 传入子进程中

    c1 = Process(target=consumer,args=(q,'小红'))
    c2 = Process(target=consumer,args=(q,'小绿'))

    p1.start()
    p2.start()
    p3.start()

    # 将c1,2设置为守护者模式
    c1.daemon = True
    c2.daemon = True

    c1.start()
    c2.start()
    # join 设置子程序结束后主程序再结束
    p3.join()
    print('主')

四 .线程

1.定义

线程与进程都是虚拟单位,目的是为了更好的描述某种事物

进程:资源单位

进程是程序的分配资源的最小单元;一个程序可以有多个进程,但只有一个主进程;进程由程序、数据集、控制器三部分组成。

线程:执行单位

线程是程序最小的执行单元;一个进程可以有多个线程,但是只有一个主线程;线程切换分为两种:一种是I/O切换,一种是时间切换

开启一个进程一定会有一个线程,线程才是真正执行单位

2.为什么要使用线程

节省内存资源

开启进程

  1. 开辟一个名称空间,每开启一个进程都会占用一份内存资源

  2. 会自带一个线程

开启线程

  1. 一个线程可以开启多个线程,
  2. 线程的开销远小于进程,

注意: 线程不能实现并行,线程只能实现并发.

3.线程的创建

from threading import Thread调用线程

线程不需要在if __name__ == '__main__':下启动也可以执行.

方式1

实例化线程对象

from threading import Thread
import time

def task():
    print('线程开启')
    time.sleep(1)
    print('线程结束')

# 不在__name__下也可以执行,名称空间使用的是进程的
if __name__ == '__main__':
    # 实例化线程t
    t = Thread(target=task)
    t.start()

方式2

创建类

from threading import Thread
import time
class MyTh(Thread):
    def run(self):
        print('线程开启')
        time.sleep(1)
        print('线程结束')
if __name__ == '__main__':
    t = MyTh()
    t.start()

4.属性方法

主进程像是工厂,子进程是车间,线程则是流水线

from threading import current_thread导入属性方法

1.current_thread.name()

获得线程号

from threading import Thread
import time
from threading import current_thread

def task():
    print(f'线程开启{current_thread().name}')
    time.sleep(1)
    print(f'线程结束{current_thread().name}')

# 不在__name__下也可以执行,名称空间使用的是进程的
if __name__ == '__main__':
    for i in range(3):
        # 实例化线程t
        t = Thread(target=task)
        t.start()

2.isAlive

判断线程是否存活

def task():
    print(f'线程开启{current_thread().name}')
    time.sleep(1)
    print(f'线程结束{current_thread().name}')

# 不在__name__下也可以执行,名称空间使用的是进程的
if __name__ == '__main__':
    # 实例化线程t
    t = Thread(target=task)
    print(t.isAlive())  # false
    t.start()
    print(t.isAlive())  # True
    print(t.is_alive()) # True

3. .daemon = True

守护的是进程的运程周期

from threading import Thread
import time

def task():
    print('线程开启')
    time.sleep(0.1)
    print('线程结束')
if __name__ == '__main__':
    # 实例化线程t
    t = Thread(target=task)
    t.daemon = True
    t.start()
    print('主')

'''# 主进程结束线程即结束
线程开启
主'''

4.线程互斥锁

线程之间的数据是共享的

# 每个线程都会执行task修改操作,但每个人都是并发的,所以使用加锁进来串行操作

from threading import Thread
import time

'''
线程之间数据是共享的.
'''


from threading import Thread, Lock
import time

mutex = Lock()

n = 100


def task(i):
    print(f'线程{i}启动...')
    global n
    mutex.acquire()
    temp = n
    time.sleep(0.1)  # 一共等待10秒
    n = temp-1
    print(n)
    mutex.release()

if __name__ == '__main__':
    t_l=[]
    for i in range(100):
        t = Thread(target=task, args=(i, ))
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()

    # 100个线程都是在100-1
    print(n)

原文地址:https://www.cnblogs.com/fwzzz/p/11721928.html