管道

from multiprocessing import Pipe,Process

def func(conn1,conn2):
    conn2.close()
    while True:
        try :
            msg = conn1.recv()
            print(msg)
        except EOFError:
            conn1.close()
            break

if __name__ == '__main__':
    conn1, conn2 = Pipe()
    Process(target=func,args = (conn1,conn2)).start()
    conn1.close()
    for i in range(20):
        conn2.send('吃了么')
    conn2.close()
管道
# from multiprocessing import Lock,Pipe,Process 
# def producer(con,pro,name,food):
#     con.close()
#     for i in range(100):
#         f = '%s生产%s%s'%(name,food,i)
#         print(f)
#         pro.send(f)
#     pro.send(None)
#     pro.send(None)
#     pro.send(None)
#     pro.close()
#
# def consumer(con,pro,name,lock):
#     pro.close()
#     while True:
#             lock.acquire()
#             food = con.recv()
#             lock.release()
#             if food is None:
#                 con.close()
#                 break
#             print('%s吃了%s' % (name, food))
# if __name__ == '__main__':
#     con,pro = Pipe()
#     lock= Lock()
#     p = Process(target=producer,args=(con,pro,'egon','泔水'))
#     c1 = Process(target=consumer, args=(con, pro, 'alex',lock))
#     c2 = Process(target=consumer, args=(con, pro, 'bossjin',lock))
#     c3 = Process(target=consumer, args=(con, pro, 'wusir',lock))
#     c1.start()
#     c2.start()
#     c3.start()
#     p.start()
#     con.close()
#     pro.close()

# from multiprocessing import Process,Pipe,Lock
#
# def consumer(produce, consume,name,lock):
#     produce.close()
#     while True:
#         lock.acquire()
#         baozi=consume.recv()
#         lock.release()
#         if baozi:
#             print('%s 收到包子:%s' %(name,baozi))
#         else:
#             consume.close()
#             break
#
# def producer(produce, consume,n):
#     consume.close()
#     for i in range(n):
#         produce.send(i)
#     produce.send(None)
#     produce.send(None)
#     produce.close()
#
# if __name__ == '__main__':
#     produce,consume=Pipe()
#     lock = Lock()
#     c1=Process(target=consumer,args=(produce,consume,'c1',lock))
#     c2=Process(target=consumer,args=(produce,consume,'c2',lock))
#     p1=Process(target=producer,args=(produce,consume,30))
#     c1.start()
#     c2.start()
#     p1.start()
#     produce.close()
#     consume.close()

# pipe 数据不安全性
# IPC
# 加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象

# 队列 进程之间数据安全的
# 管道 + 锁
管道实现生产者消费者模型
管道实现数据共享
# from multiprocessing import Manager,Process

# def main(dic):
#     dic['count'] -= 1
#     print(dic)
#
# if __name__ == '__main__':
#     m = Manager()
#     dic=m.dict({'count':100})
#     p_lst = []
#     p = Process(target=main, args=(dic,))
#     p.start()
#     p.join()

from multiprocessing import Manager,Process,Lock
def main(dic,lock):
    dic['count'] -= 1

if __name__ == '__main__':
    m = Manager()
    l = Lock()
    dic=m.dict({'count':100})
    p_lst = []
    for i in range(50):
        p = Process(target=main,args=(dic,l))
        p.start()
        p_lst.append(p)
    for i in p_lst: i.join()
    print('主进程',dic)
管道实现数据共享

非常重要的进程池概念:用少的继承解决更多的事情

##为什么有进程池的概念?
##效率问题
##每次开启进程要占用属于这个进程的空间
##寄存器  堆栈  文件
##进程过多时 操作系统要调度
##
##
##什么是进程池?
##将进程放入一个专属的内存空间就像是放入了池子中,然后开辟n下限,m上限.相当于出水口
##这样多进程就不会占用系统太多的内存空间,操作系统也好调度进程了.
##
##当然高级进程池可以自动的调换出水口多少
##
##
##python中用到multiprocessing模块的Pool方法
进程池基础知识
import time
from multiprocessing import Process,Pool

def func(n):
     for i in range(10):
          print(n+1)

def func2(n):
     for i in range(10):
          print(n+2)

if __name__=="__main__":
     start = time.time()
     pool = Pool(5)
     pool.map(func,range(100))
     pool.map(func2,range(100))
     t1 = time.time()-start

     start = time.time()
     p_lst = []
     for i in range(100):
         p = Process(target=func,args=(i,))
         p_lst.append(p)
         p.start()
     for p in p_lst :p.join()
     t2 = time.time() - start
     print(t1,t2)
进程池
import os
import time
from multiprocessing import Pool#默认起cpu个数
def func(n):
    print('start func%s'%n,os.getpid())
    time.sleep(1)
    print('end func%s' % n,os.getpid())

if __name__ == '__main__':
    p = Pool(5)
    for i in range(10):
         p.apply(func,args=(i,))#同步提交任务
##        p.apply_async(func,args=(i,))#异步提交任务
    p.close()  # 结束进程池接收任务
    p.join()   # 感知进程池中的任务执行结束
进程池的同步异步
# p = Pool()
# p.map(funcname,iterable)     默认异步的执行任务,且自带close和join
# p.apply   同步调用的
# p.apply_async 异步调用 和主进程完全异步 需要手动close 和 join
# from multiprocessing import Pool
# def func(i):
#     return i*i
#
# if __name__ == '__main__':
#     p = Pool(5)
#     for i in range(10):
#         res = p.apply(func,args=(i,))   # apply的结果就是func的返回值
#         print(res)

# import time
# from multiprocessing import Pool
# def func(i):
#     time.sleep(0.5)
#     return i*i
#
# if __name__ == '__main__':
#     p = Pool(5)
#     res_l = []
#     for i in range(10):
#         res = p.apply_async(func,args=(i,))   # apply的结果就是func的返回值
#         res_l.append(res)
#     for res in res_l:print(res.get())# 等着 func的计算结果

# import time
# from multiprocessing import Pool
# def func(i):
#     time.sleep(0.5)
#     return i*i
#
# if __name__ == '__main__':
#     p = Pool(5)
#     ret = p.map(func,range(100))
#     print(ret)
进程池的返回值
# 回调函数
import os
from multiprocessing import Pool
def func1(n):
    print('in func1',os.getpid())
    return n*n

def func2(nn):
    print('in func2',os.getpid())
    print(nn)

if __name__ == '__main__':
    print('主进程 :',os.getpid())
    p = Pool(5)
    for i in range(10):
        p.apply_async(func1,args=(10,),callback=func2)
    p.close()
    p.join()
进程池回调函数
原文地址:https://www.cnblogs.com/cangshuchirou/p/8665167.html