多进程

#创建多进程的用法和常用的属性
from multiprocessing import Process
import time
import os
def func1():
    # time.sleep(6)
    print('这里是孙子进程,孙子自己的pid是%s,孙子的父进程的pid是%s'%(os.getpid(),os.getppid()))

def func():
    p1 = Process(target=func1)
    p1.start()
    # p1.terminate()                            #如果父进程没有执行,程序可以执行到这里,若是放在这里则杀死进程
    # p1.join()                                  #join的位置在
    # time.sleep(5)
    print('这里是儿子进程,儿子自己的pid是%s,儿子的父进程的pid是%s'%(os.getpid(),os.getppid()))

# os.getpid()获取的是当前进程自己的pid
# os.getppid()获取的是当前进程的父进程的pid
if __name__ == '__main__':
    # for i in range(2):
    p = Process(target=func)          # 实例化一个进程对象,括号内可以给子进程传参数 args=(xxx,)  注意是元组的形式

    # p.daemon = True                 #将子进程进程设置成守护进程,必须要在start之前设置为True
                                      #守护进程会随着父进程的终结而终结:守护进程不会再创建子进程
    p.start()                         # 开启一个子进程(可以理解成就绪状态),实际调用run方法执行(理解成执行状态)
    # p.terminate()                   #杀死子进程,放在创建父进程strat之后,杀死所有进程,放子进程中,则杀死所有子进程
    # c= p.pid                        #返回子进程的pid,要用变量接住,操作系统随机分配的数字
    # print(c)

    # p.join()  #将异步实现同步,等待子进程执行完毕,再执行父进程,可以理解成阻塞,半不执行
                # 父进程执行join就是同步,不执行join就是异步关系,必须放在star之后
    print('这里是父亲进程,父进程自己的pid是:%s,父亲的父亲的pid是%s'%(os.getpid(),os.getppid()))
View Code

 一,创建进程的类

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
View Code

  参数介绍

1 group参数未使用,值始终为None
2 
3 target表示调用对象,即子进程要执行的任务
4 
5 args表示调用对象的位置参数元组,args=(1,2,'egon',)
6 
7 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
8 
9 name为子进程的名称
View Code

  方法介绍

 1 p.start():启动进程,并调用该子进程中的p.run() 
 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
 3 
 4 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
 5 p.is_alive():如果p仍然运行,返回True
 6 
 7 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程  
View Code

  属性介绍

1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
2 
3 p.name:进程的名称
4 
5 p.pid:进程的pid
6 
7 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
8 
9 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
View Code

二,进程间内存是相互隔离的,开启进程的两种方式:

  方法一:

#开进程的方法一:
import time
import random
from multiprocessing import Process
def piao(name):
    print('%s piaoing' %name)
    time.sleep(random.randrange(1,5))
    print('%s piao end' %name)



p1=Process(target=piao,args=('egon',)) #必须加,号
p2=Process(target=piao,args=('alex',))
p3=Process(target=piao,args=('wupeqi',))
p4=Process(target=piao,args=('yuanhao',))

p1.start()
p2.start()
p3.start()
p4.start()
print('主线程')
View Code

  方法二:

#开进程的方法二:
import time
import random
from multiprocessing import Process


class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s piaoing' %self.name)

        time.sleep(random.randrange(1,5))
        print('%s piao end' %self.name)

p1=Piao('egon')
p2=Piao('alex')
p3=Piao('wupeiqi')
p4=Piao('yuanhao')

p1.start() #start会自动调用run
p2.start()
p3.start()
p4.start()
print('主线程')
View Code

三,在windows中Process()必须放到    if __name__ == '__main__'  下面

四,进程中保证信息安全的锁机制:

   from  multiprocessing  import   Lock

   l = Lock()   ====>  l.acquire()   获取钥匙,隔离其他进程         l.release()   释放钥匙等待其他进程获取 

                       ====>  理解; 这个锁机制就是为了保证数据在运行过程中的安全性而存在的

                                             这个所机制的可以一部分代码(修改共享数据的)串行,有时候位置的摆放会缩减程序的执行效率.

                                             单个进程内指定多个任务去抢同一把锁,作用于局部

示例1:模拟抢票:

from multiprocessing import Process,Lock
import time

def buy_ticket(i,l):
    l.acquire()
    with open('余票','r') as f1:
        count = int(f1.read())
        time.sleep(1)

    if count > 0:
        print('33[31m 第%s个人买到票了33[0m'%i)
        count -= 1
    else:
        print('33[32m 第%s个人没有买到票 33[0m'%i)
    with open('余票','w') as f:
        f.write(str(count))
    l.release()

def get(i):
    with open('余票') as f1:
        count = f1.read()
    print('第%s个人查到余票还剩%s张'%(i,count))
if __name__ == '__main__':
    l = Lock()
    for i in range(10):
        p = Process(target=buy_ticket,args=(i+1,l))
        p.start()
    for i in range(10):
        p1 = Process(target=get,args=(i+1,))
        p1.start()
View Code

示例2;模拟ATM存取款

from multiprocessing import Process,Value,Lock
import time


def get_money(num,l):# 取钱
    l.acquire()# 拿走钥匙,锁上门,不允许其他人进屋
    for i in range(100):
        num.value -= 1
        print(num.value)
        time.sleep(0.01)
    l.release()# 还钥匙,打开门,允许其他人进屋

def put_money(num,l):# 存钱
    l.acquire()
    for i in range(100):
        num.value += 1
        print(num.value)
        time.sleep(0.01)
    l.release()

if __name__ == '__main__':
    num = Value('i',100)
    l = Lock()
    p = Process(target=get_money,args=(num,l))
    p.start()
    p1 = Process(target=put_money, args=(num,l))
    p1.start()
    p.join()
    p1.join()
    print(num.value)
View Code

五,信号量

     from multiprocessing import Semaphore

    l = Semaphore(3)      ===>互斥锁同时只允许一个线程更改程序,Semaphore则同时允许多个线程更改程序

                                      ===>后面的进程只能等待锁释放了,才能获取锁,和进程池的概念很像,信号量涉及到加锁

示例:

from multiprocessing import Process,Semaphore
import time
import random

def func(i,sem):
    sem.acquire()
    print('第%s个人进入小黑屋,拿了钥匙锁上门' % i)
    time.sleep(random.randint(3,5))
    print('第%s个人出去小黑屋,还了钥匙打开门' % i)
    sem.release()

if __name__ == '__main__':
    sem = Semaphore(5)# 初始化了一把锁5把钥匙,也就是说允许5个人同时进入小黑屋
    # 之后其他人必须等待,等有人从小黑屋出来,还了钥匙,才能允许后边的人进入
    for i in range(20):
        p = Process(target=func,args=(i,sem,))
        p.start()
View Code

六,事件机制:

    from multiprocessing  import  Even

    l = Even()           ===>理解:通过is_set()的bool值,去标识e.wait()的阻塞状态

                                       当is_set()的bool值为False时,e.wait()是阻塞状态

                                       当is_set()的bool值为True时,e.wait()是非阻塞状态

                                       当使用e.set()时,是把is_set()的bool变为True

                                       当使用e.clear()时,是把is_set()的bool变为False

示例:红绿灯

from multiprocessing import Process,Event
import time
import random

def tra(e):
    '''信号灯函数'''
    # e.set()
    # print('33[32m 绿灯亮! 33[0m')
    while 1:# 红绿灯得一直亮着,要么是红灯要么是绿灯
        if e.is_set():# True,代表绿灯亮,那么此时代表可以过车
            time.sleep(5)# 所以在这让灯等5秒钟,这段时间让车过
            print('33[31m 红灯亮! 33[0m')# 绿灯亮了5秒后应该提示到红灯亮
            e.clear()# 把is_set设置为False
        else:
            time.sleep(5)# 此时代表红灯亮了,此时应该红灯亮5秒,在此等5秒
            print('33[32m 绿灯亮! 33[0m')# 红的亮够5秒后,该绿灯亮了
            e.set()# 将is_set设置为True

def Car(i,e):
    e.wait()# 车等在红绿灯,此时要看是红灯还是绿灯,如果is_set为True就是绿灯,此时可以过车
    print('第%s辆车过去了'%i)

if __name__ == '__main__':
    e = Event()
    triff_light = Process(target=tra,args=(e,))# 信号灯的进程
    triff_light.start()
    for i in range(50):# 描述50辆车的进程
        if i % 3 == 0:
            time.sleep(2)
        car = Process(target=Car,args=(i+1,e,))
        car.start()
View Code

 七, 生产者和消费者模型(这是一种编程思想,主要是解除多进程之间的数据耦合)

  生产者产生的数据供消费者使用时,由于生产时间或者延时导致多进程间的数据不同步,这样就要借助一个中间量来解决最理想的就是队列

  什么是数列?   每个进程都有桟 :   桟是个半开半不开的数据容器, 先进后出   (简称FILO)

                               队列则是遵循   是个安全的,有序的,带阻塞的数据容器   先进先出   (简称FIFO)

  (1) from  multiprocessing  import Queue

           q =  Queue(8)     ===>  实例化一个对象,括号里的数字表队列最大长度

   q.get()                 ===>阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待   

   q.put()                 ===>阻塞,如果可以继续往对列中放数据,就直接放,不能放就阻塞等待    注意;括号里的内容是对象,可以放一切对象

   q.get_nowait()     ===>不阻塞,如果有数据直接获取,没有数据就报错

   q.put_nowait()     ===> 不阻塞,如果可以继续往队列中放数据,就直接放,不能放就报错

  (2)from  multiprocrssing  import   JoinableQueue

    JoinableQueue继承的是Queue,所以也可以使用Queue的方法.

   q.join                   ===>  用于生产者,等待q.task_done的返回结果,通过返回结果,生产者就能获得消费者当前消费了多少个数

   q.task_done        ===>用于消费者,是指每消费队列中一个数据,就给join返回一个标识

示例:

#生产者消费者模型
from multiprocessing import Process,JoinableQueue
import time
def consumer(q,name):
    while True:
        goods = q.get()       #获取生产者的商品数据
        print('%s消费了%s'%(name,goods))
        q.task_done()    #这个用于消费者,是每消费队列中的一个数据,就给join返回一个标识
def producer(q,name):
    for i in range(20):
        goods = '%s做了%s号包子'%(name,i)
        q.put(goods)          #将生产者的商品数据放进队列
    q.join()         #用于生产者,等待q.task_done的返回结果,生产者就能获得消费者当前消费了多少数据
if __name__ == '__main__':
    q = JoinableQueue(5)   #造出来队列
    p = Process(target=consumer,args=(q,'alex'))
    c = Process(target=producer,args=(q,'egon'))
    p.daemon = True #把消费者设置为守护进程
    p.start()
    c.start()
    c.join()   #主程序等待生产者进程结束
#备注:这个模型有三个进程:   主进程,生产进程,消费进程
#主进程c.join会等待生产者进程执行结束,而生产者中q.join会阻塞等待消费者消费完队列中的生产数据
#所以将消费者设置为守护进程,当主进程执行完,就代表生产者进程已经结束,消费者也消费完列表中的数据
#所以,只要主程序执行完毕,代表着生产者,生产完,消费者也消费完了
View Code

  (3)import  queue

import queue

p1 = queue.Queue()#遵循先进先出
p1.put('gao')
p1.put(2)
print(p1.get())

p2 = queue.LifoQueue() #遵循后进先出
p2.put(5)
p2.put(4)
print(p2.get())
print(p2.get())

p3 = queue.PriorityQueue()   #优先级队列(数字越小越先执行)
#put()方法接受的是一个元组(),第一个位置是优先级,第二个位置是数据
#优先级是按照ASC码比较的,当ASCII相同时,会按照先进先出的原则
#数字转ASCII   =====>chr(65)    字符转数字    =====>ord('20')
#字符串的大小比较是按照首字母的大小比较的
#中文是按照字节码的首位  来比较大小的
p3.put((1,'abc'))
p3.put((5,'abc'))
p3.put((-5,'abc'))
print((p3.get()))
View Code

八,  进程池     (注意这个是重点!)    

  from    multipricessing   import   Pool

  p =  Pool(5)     ====>这个进程数一般设置成比cpu合数大一位的      (os.cpu_count())

  进程池:    一个池子,在里面有固定数量的进程,处于待命状态,只要有任务来就去处理

                在实际的业务中,任务是有多有少的,如果任务量特别的多,不可能要开对应那么多的进程数

                     开启多进程需要操作系统为你管理它,其次还需要大量的时间 让cpu去调度它,这些都是很花时间的

  进程池有三个方法:  进程池会帮助程序员去管理池中的进程

<1>map(func,iterable)

  func: 进程池中的进程执行的任务函数

  iterable:可迭代的对象,是把可迭代的对象中的每个元素依次传给任务函数当参数

<2>.apply(func,args= ())                                          ===> 同步进程,是把池中的进程一个一个的去执行任务

  func:进程池中的进程执行的任务函数

  args: 可迭代对象型的参数,是传给任务函数的参数

  同步处理任务时:不需要close和join     而且所有的进程是普通进程(主进程要等待其执行结束)  

<3>apply_async(func,args=(),callback=None)         ===>异步进程,是指的池中的进程一次性都去执行任务

  func:进程池中的进程执行的任务函数

  args: 可迭代对象型的参数,是传给任务函数的参数

  callback: 回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交回给回调函数,由回调函数进行下一步的处理,

            注意::只有在异步的时候,才有回调函数,同步没有,   还有就是回调函数是主程序的进程

  异步处理任务时:进程池中的所有进程是守护进程(主程序代码执行完毕守护进程就结束)

  异步处理任务时:;必须要加上p.close()      ====>  因为异步执行时,执行快的程序执行完毕时,需要归还占用的系统资源,供下次使用

                                        也要加上p.join()       ====>  因为异步执行时,进程池中的所有进程是守护进程(主程序代码执行完毕守护进程就结束)

  回调函数的使用:

    进程的任务函数的返回值,被当成返回调用函数的形参接受到,以此进行进一步的处理操作回调函数是由主程序调用的,而不是子进程

    子进程只是把结果传给回调函数

 九,管道:(了解)

  from  multiprocessing  import  Pipe

  con1,con2 = Pipe()          #管道是不安全的,是用于多进程之间的通信的一种方式

  (1) 如果在单进程中使用管道,     那么就是con1收数据,con2发数据

                                                             如果是con1发数据,con2收数据

  (2)在多进程中使用管道,那么就是 父进程使用con1收,子进程就必须使用con2发

                                  父进程使用con1发,子进程就必须使用con2收

                     父进程使用con2收,子进程就必须使用con1发

                   父进程使用con2发,子进程就必须使用con1收

  

   在管道中有一个著名的错误叫做EOFError。是指,父进程中如果关闭了发送端,子进程还继续接收数据,那么就会EOFError。

十,进程之间共享内容(典型的银行的多进程见的数据共享)

(1)from multiprocessing  import  Manager  

  m = Manager()

  num = m,dict({键:值})   #注意这里需要什么样的数据类型,就创建什么类型的

  num = m.list([1,2,3,6])         #在进程中用的时候num.[x] 就可以

(2)from multiprocess,ing  import   Value

  num = Value('i',100)   #在进程中用的时候是num.value

  

原文地址:https://www.cnblogs.com/laogao123/p/9507322.html