day10---multiprocess 多进程

multiprocess
Queue   Pipe 只是实现进程间数据的传递
Manager 实现了进程间数据的共享,即多个进程可以修改同一份数据
 

进程模块 multiprocessing

#!/usr/bin/env python
# -*- coding: utf-8 -*-


from multiprocessing import Process
def run(name):
    print('my name is %s'% name)
if __name__ == '__main__':
    p = Process(target=run,args=('lilei',))#创建一个进程实例
    p.start()
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author:DCC
from multiprocessing import Process
import multiprocessing
import time,threading
import os
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("

")
def f(name):
    info('33[31;1mfunction f33[0m')
    print('hello', name)
if __name__ == '__main__':
    info('33[32;1mmain process line33[0m')
    p = Process(target=f, args=('bob',))
    p.start()

def thread_run():
    print(threading.get_ident())
def run(name):
    time.sleep(2)
    print("hello",name)
    t = threading.Thread(target=thread_run,)
    t.start()
if __name__ == "__main__":
    for i in range(5):
        p = multiprocessing.Process(target=run,args=("bob %s" %i,))
        p.start()

进程号获取,父子进程关系

from multiprocessing import Process
import os

def info(title):    #info 函数打印父子进程
    print(title)
    print('module name:',__name__)
    print('parent process name:',os.getppid())#打印父进程
    print('child process name:',os.getpid())#打印子进程

def f(name):
    info('33[31;1m called from child process function f 33[0m')#打印f函数的父子进程
    print('hello ',name)

if __name__ == '__main__':
    info('33[32;1m main process 33[0m')     #主程序调用info函数打印父子进程
    p = Process(target=f,args=('hanmeimei',))        #主程序启动一个子进程,打印子进程的父子函数
    p.start()
    p.join()

进程间通信Queue,pipe

from multiprocessing import Queue,Process

def f(cq):
    print('in child before cq.put:',cq.qsize()) #子进程put前查看队列中是否有数据
    cq.put(['my','name','is',['lilei','xixi']]) #往队列中添加一个元素

if __name__ == '__main__':
    mq = Queue()            #定义进程队列实例
    mq.put('fome main')     #往队列中添加一个元素
    p = Process(target=f,args=(mq,))#创建一个子进程,并将mq传给子进程
    p.start()                       #启动
    p.join()                        #等待子进程执行完毕
    print('444',mq.get_nowait())#获取队列元素
    print('444',mq.get_nowait())
from multiprocessing import  Process,Pipe

def f(conn):
    conn.send("from child1") #发送数据
    conn.send("from child2") #发送数据
    print('client recv:',conn.recv())#接收数据
    conn.close()

if __name__ == '__main__':
    a_conn, b_conn = Pipe()     #创建管道
    p = Process(target=f,args=(b_conn,))    #实例化子进程,函数f,参数管道的一端
    p.start()
    print(a_conn.recv())
    print(a_conn.recv())
    a_conn.send('from parent') #父进程发送数据
    p.join()

进程间数据共享manager

from multiprocessing import Process,Manager
import os
def run(d,l):
    d[os.getpid()] = os.getpid()   #以当前子进程的pid为key,同时pid也作为value
    l.append(os.getpid())           
    print(d,l)

if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()  #manager 字典
            l = manager.list()  #manager 列表
            p_list = []         #空的列表,为之后的添加进程实例
            for i in range(10): #启动多个子进程
                p = Process(target=run,args=(d,l))#起一子进程,执行run参数d,l
                p.start()
                p_list.append(p)  #添加进程实例至列表
            for r in p_list:    #循环子进程
                r.join()          #等待子进程结束
            print(d)            #打印最终的字典
            print(l)            #打印最终的列表

 进程同步

Without using the lock output from the different processes is liable to get all mixed up.

from multiprocessing import Process, Lock
 
def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
 
if __name__ == '__main__':
    lock = Lock()
 
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

进程池 

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply
  • apply_async
from  multiprocessing import Process,Pool
import time
 
def Foo(i):
    time.sleep(2)
    return i+100
 
def Bar(arg):
    print('-->exec done:',arg)
 
pool = Pool(5)
 
for i in range(10):
    pool.apply_async(func=Foo, args=(i,),callback=Bar)
    #pool.apply(func=Foo, args=(i,))
 
print('end')
pool.close()
pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author:DCC
from  multiprocessing import Process, Pool ,freeze_support
import time
import os

def Foo(i):
    print("in process",os.getpid())
    time.sleep(2)
    return i + 100


def Bar(arg):
    print('-->exec done:', arg,os.getpid())

if __name__ == "__main__":
    freeze_support()
    pool = Pool(3)
    #pool = Pool(processes=5)
    print("主进程号", os.getpid())
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback 回调,当func进行完的时候,再执行callback
        # pool.apply(func=Foo, args=(i,))


    print('end')
    pool.close() # 必须先关闭,再join
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
原文地址:https://www.cnblogs.com/dcc001/p/5956874.html