多进程

multiprocessing模块下Process、Lock

进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。

无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务

并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发

并行:同时运行,只有具备多个cpu才能实现并行

开启子进程

方式一:实例化Process对象,指定参数target,args

方式二:重构Process类的run方法,直接实例化

from multiprocessing import Process
import time

def task(name):
    print('%s is running' %name)
    time.sleep(3)
    print('%s is done' %name)

if __name__ == '__main__':
    # p = Process(target=task,kwargs={'name':'子进程1'})
    p = Process(target=task,args=('子进程1',))
    p.start() #仅仅只是给操作系统发送了一个信号

    print('')
方式一
from multiprocessing import Process
import time


class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):  # 方法名一定要叫run
        print('%s is running' % self.name)
        time.sleep(3)
        print('%s is done' % self.name)


# windows系统里开进程一定要放在if __name__ == '__main__':

if __name__ == '__main__':
    p = MyProcess('子进程1')
    p.start()  # 向操作系统发信号,告诉操作系统开启子进程
    print('')
方式二

查看进程号pid

os.getpid():当前文件的进程id
os.getppid():当前文件的父进程id,即pycharm
windows 系统cmd查看进程id:tasklist | findstr pycharm(进程名)
       杀死进程:taskkill /F /PID 进程号xxx
from multiprocessing import Process
import time, os


def task():
    print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid()))
    time.sleep(3)
    print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid()))


if __name__ == '__main__':
    p = Process(target=task, )
    p.start()

    print('', os.getpid(), os.getppid())
    # os.getpid():当前文件的进程id
    # os.getppid():当前文件的父进程id,即pycharm
    '''
    windows 系统cmd查看进程id命令:
    tasklist | findstr pycharm
    '''
View Code

join方法

# join方法
# from multiprocessing import Process
# import time,os
#
# def task():
#     print('%s is running,parent id is <%s>' %(os.getpid(),os.getppid()))
#     time.sleep(3)
#     print('%s is done,parent id is <%s>' %(os.getpid(),os.getppid()))
#
# if __name__ == '__main__':
#     p=Process(target=task,)
#     p.start()
""""""
#     p.join()  # 进程p执行结束以后程序才会往下走
#     print('主',os.getpid(),os.getppid())
#     print(p.pid)  # 验证僵尸进程,子进程已结束但pid未回收,
#                   # 如果主进程pid被回收,子进程pid也被回收


# from multiprocessing import Process
# import time,os
#
# def task(name,n):
#     print('%s is running' %name)
#     time.sleep(n)
#
#
# if __name__ == '__main__':
#     start=time.time()
#     p1=Process(target=task,args=('子进程1',5))
#     p2=Process(target=task,args=('子进程2',3))
#     p3=Process(target=task,args=('子进程3',2))
#     p_l=[p1,p2,p3]
#
#     # p1.start()  # 向操作系统发信号,告诉操作系统开启子进程
#     # p2.start()  # 但是操作系统什么时候开,先开谁
#     # p3.start()  # 不知道
#     for p in p_l:
#         p.start()
#
#     # p1.join()
#     # p2.join()
#     # p3.join()
#     for p in p_l:
#         p.join()
#
#     print('主',(time.time()-start))


# from multiprocessing import Process
# import time,os
#
# def task(name,n):
#     print('%s is running' %name)
#     time.sleep(n)
#
#
# if __name__ == '__main__':
#     start=time.time()
#     p1=Process(target=task,args=('子进程1',5))
#     p2=Process(target=task,args=('子进程2',3))
#     p3=Process(target=task,args=('子进程3',2))
#
#     p1.start()
#     p1.join()
#     p2.start()
#     p2.join()
#     p3.start()
#     p3.join()
#
#     print('主',(time.time()-start))


# 了解:is_alive
from multiprocessing import Process
import time, os


def task():
    print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid()))
    time.sleep(3)
    print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid()))


if __name__ == '__main__':
    # p=Process(target=task,)
    # p.start()
    # # print(p.is_alive()) # p进程是否结束
    # p.join()
    # print('主',os.getpid(),os.getppid())
    # print(p.pid)
    # # print(p.is_alive())

    p = Process(target=task, name='sub——Precsss')
    p.start()
    p.terminate()  # 向系统发信号:"干掉p进程"。系统什么时候执行?不知道
    time.sleep(3)
    print(p.is_alive())   # 进程p是否还活着
    print('')
    print(p.name)  # 进程名,在产生进程的时候可以设置,否则使用默认值
#                  # 进程p已被干掉但进程名,进程id还在
View Code

守护进程:p.daemon=True   在p.start()之前设置

    主进程结束则整个程序结束,不会再等子进程执行完

from multiprocessing import Process
import time

def task(name):
    print('%s is running' %name)
    time.sleep(2)
    p=Process(target=time.sleep,args=(3,))
    p.start()


if __name__ == '__main__':
    p=Process(target=task,args=('子进程1',))
    p.daemon=True  # 设置p为守护进程,一定要在start前设置
    p.start()      # 守护进程内不能再开子进程

    p.join()
    print('')
View Code

互斥锁:多个进程时,牺牲效率,保证数据安全

  在开启进程之前先生成锁,之后抢锁,谁抢到了先执行谁,全部执行完后释放锁,剩下task的再抢

# 互斥锁:牺牲效率,保证数据安全
from multiprocessing import Process, Lock
import time


def task(name, mutex):
    mutex.acquire()  # 获取锁
    print('%s 1' % name)
    time.sleep(1)
    print('%s 2' % name)
    time.sleep(1)
    print('%s 3' % name)
    mutex.release()  # 释放锁


if __name__ == '__main__':
    mutex = Lock()  # 生成锁
    for i in range(3):  # 3个task抢锁,谁抢到了先执行谁,全部执行完后释放锁,剩下task的再抢
        p = Process(target=task, args=('进程%s' % i, mutex))
        p.start()
View Code
if __name__ == '__main__':
    # mutex = Lock()  # 生成锁
    for i in range(3):  # 3个task抢锁,谁抢到了先执行谁,全部执行完后释放锁,剩下task的再抢
        p = Process(target=task, args=('进程%s' % i, Lock() ))
        p.start()


可不可以这样呢?如果Lock() 是单例模式,那么没问题,否则不可。三个进程拿到三把不同的锁,就没有意义了。
question

join():把整个程序变成串行
Lock():把并行的程序,需要串行的地方加锁改成串行

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然可以用文件共享数据实现进程间通信,但问题是:

  1、效率低(共享数据基于文件,而文件是硬盘上的数据)

  2、需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:

  1、效率高(多个进程共享一块内存的数据)

  2、帮我们处理好锁问题。

这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。

我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

队列:Queue

  先进先出

from multiprocessing import Queue

q = Queue(3)

q.put('hello')
q.put({'a': 1})
q.put([3, 3, 3, ])
print(q.full())

# q.put(4)

print(q.get())
print(q.get())
print(q.get())
print(q.empty())
View Code
from multiprocessing import Process, Queue
import time


def producer(q):
    for i in range(3):
        res = '包子%s' % i
        time.sleep(0.5)
        print('生产者生产了%s' % res)

        q.put(res)


def consumer(q):
    while True:
        res = q.get()
        # if res is None:break
        if not res: break
        time.sleep(1)
        print('消费者吃了%s' % res)


if __name__ == '__main__':
    # 容器
    q = Queue()

    # 生产者们
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))

    # 消费者们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()  # 等所有的生产者生产完毕以后,向队列里放入结束信号
    q.put(None)  # 有几个消费者放几个结束信号
    q.put('')
    print('')
View Code

JoinableQueue

from multiprocessing import Process, JoinableQueue
import time


def producer(q):
    for i in range(2):
        res = '包子%s' % i
        time.sleep(0.5)
        print('生产者生产了%s' % res)

        q.put(res)
    q.join()


def consumer(q):
    while True:
        res = q.get()
        if res is None: break
        time.sleep(1)
        print('消费者吃了%s' % res)
        q.task_done()


if __name__ == '__main__':
    # 容器
    q = JoinableQueue()

    # 生产者们
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))

    # 消费者们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    print('')
View Code
这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
原文地址:https://www.cnblogs.com/webc/p/9167970.html