Python9-进程池-day38

复习

# 信号量
from multiprocessing import  Semaphore
# 用锁的原理实现的,内置了一个计数器
# 在同一个事件,只能有指定数量的进程执行某一段被控制住的代码
# 事件
# wait阻塞受到事件控制的同步组件
# 状态  True  Flase  is_set
#         true--》false  用clear()
#         false --->true  用set()
# wait方法 状态为true不阻塞  状态为false的时候阻塞

# 队列
# Queue
#     put     当队列满的时候阻塞等待队列有空位置
#     get     当队列空的时候阻塞等待队列有数据
#     full empty  不完全准确

# JoinableQuere
# task_done   与get连用
# join        与put连用

管道

from multiprocessing import Pipe,Process
def func(conn1,conn2):
    conn2.close()
    while True:
        try:
            msg = conn1.recv()
            print(msg)
        except EOFError:
            conn1.close()
            break

if __name__ == '__main__':
    conn1,conn2 = Pipe()
    Process(target=func,args=(conn1,conn2)).start()
    conn1.close()
    for i in range(20):
        conn2.send('吃了吗')
    conn2.close()
from multiprocessing import  Pipe,Process
import time,random
def producer(con,pro,name,food):
    con.close()
    for i in range(4):
       time.sleep(random.randint(1,3))
       f = '%s生产%s%s'%(name,food,i)
       print(f)
       pro.send(f)
    pro.close()
def consumer(con,pro,name):
    pro.close()
    while True:
        try:
           food =  con.recv()
           print('%s吃了%s'%(name,food))
           time.sleep(random.randint(1,3))
        except EOFError:
            con.close()
            break
if __name__ == '__main__':
    con,pro = Pipe()
    p = Process(target=producer,args = (con,pro,'egon','泔水'))
    p.start()
    c = Process(target=consumer,args = (con,pro,'alex'))
    c.start()
    con.close()
    pro.close()
进程之间的数据共享
from multiprocessing import  Manager,Process,Lock

def main(dic,lock):
    lock.acquire()
    dic['count'] -= 1
    lock.release()


if __name__ == '__main__':
    m = Manager()
    l = Lock()
    dic = m.dict({'count':100})
    p_list = []
    for i in range(50):
        p = Process(target=main,args=(dic,l))
        p.start()
        p_list.append(p)
    for i in p_list: i.join()
    print('主进程:',dic)

 进程池

# 为什么有进程池
# 效率
# 每开启进程,开启属于这个进程的内存空间
# 寄存器 堆栈 文件
# 进程过多,操作系统调度进程
# 进程池
# python中的先创建一个属于进程的池子
# 这个池子指定能存放多少个进程
# 先将这些进程创建好
from multiprocessing import Pool
import  os,time
def func(n):
    print('start func%s'%n,os.getpid())
    time.sleep(1)
    print('end func%s'%n,os.getpid())
if __name__ == '__main__':
    p = Pool(5)
    for i in range(10):
        p.apply_async(func,args=(i,))
    p.close()  #结束进程池接受任务
    p.join()  #感知进程池中的任务执行结束

 socket_server-进程池

#server
import socket
from multiprocessing import Pool

def func(conn):
    conn.send(b'hello')
    print(conn.recv(1024).decode('utf-8'))
    conn.close()
if __name__ == '__main__':
    p = Pool(5)
    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()
    while True:
        conn,addr = sk.accept()
        p.apply_async(func,args=(conn,))
    sk.close()
#client
import socket

sk = socket.socket()
sk.connect(('127.0.0.1',8080))
ret = sk.recv(1024).decode('utf-8')
print(ret)
msg = input('>>>').encode('utf-8')
sk.send(msg)
sk.close()

 进程池返回值

# p.map(funcname,iterable)  默认异步的执行任务,自带close和join
# p.apply 同步调用
# p.apply_async 异步调用 和主进程完全异步 需要手动close和join
from multiprocessing import Pool
def func(i):
    return i*i

if __name__ == '__main__':
    p = Pool(5)
    for i in range(10):
        res = p.apply(func,args=(i,))  #apply的结果就是func的返回值
        print(res)
import time
from multiprocessing import Pool
def func(i):
    time.sleep(0.5)
    return i*i

if __name__ == '__main__':
    p = Pool(5)
    res_list = []
    for i in range(10):
        res = p.apply_async(func,args=(i,))  #
        res_list.append(res)
    for res in res_list:print(res.get())
#map
import time
from multiprocessing import Pool
def func(i):
    time.sleep(0.5)
    return i*i

if __name__ == '__main__':
    p = Pool(5)
    ret = p.map(func,range(10))
    print(ret)


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

 进程池的回调函数

from multiprocessing import Pool

def func1(n):
    print('in func1')
    return n*n
def func2(nn):
    print('in func2')
    print(nn)

if __name__ == '__main__':
    p = Pool(5)

    p.apply_async(func1,args=(10,),callback=func2)
    p.close()
    p.join()

in func1
in func2
100

 
from multiprocessing import Pool
import os
def func1(n):
    print('in func1',os.getpid())
    return n*n
def func2(nn):   #参数只能是func1的返回值
    print('in func2',os.getpid())
    print(nn)

if __name__ == '__main__':
    print('主进程: ',os.getpid())
    p = Pool(5)

    p.apply_async(func1,args=(10,),callback=func2)
    p.close()
    p.join()

主进程:  11172
in func1 11760
in func2 11172
100
原文地址:https://www.cnblogs.com/zhangtengccie/p/10392478.html