三十一、管道,进程数据共享,进程池,进程池的返回值,回调函数

一、管道

管道:进程与进程之间能相互通信
通信原理:是基于管道双向通信
from multiprocessing import Pipe, Process
conn1, conn2 = Pipe()
conn1.send("123456")
print(conn2.recv())
from multiprocessing import Pipe, Process
def func(conn):
    while True:
        msg = conn.recv()
        if msg == None:
            break
        print(msg)


if __name__ == '__main__':
    conn1, conn2 = Pipe()
    Process(target=func, args=(conn1,)).start()
    for i in range(10):
        conn2.send("吃了么")
    conn2.send(None)



"""
#第二种:
def func(conn1, conn2):
    conn2.close()
    while True:
        try:
            msg = conn1.recv()
            print(msg)
        except EOFError :
            conn1.close()
            break


if __name__ == '__main__':
    conn1, conn2 = Pipe()
    Process(target=func, args=(conn1,conn2)).start()
    conn1.close()
    for i in range(10):
        conn2.send("吃了么")
    conn2.close()
    # 疑问:主进程关闭通道不会影响子进程接收数据
    #
    # ********应该特别注意管道端点的正确管理问题,如果是生产者或者消费者中都没有使用管道的某个端点,就应该将它关闭
    这也说明为何在生产中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中
    的recv()操作阻塞。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常
    因此,在生产中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点
    # """
例子
from multiprocessing import Pipe, Process, Lock
import random, time


def producer(con, pro, name, food):
    con.close()
    for i in range(4):
        time.sleep(random.randint(1, 3))
        f = "%s生产%s%s" % (name, food, i)
        print(f)
        pro.send(f)
    pro.close()


def consumer(con, pro, name):
    pro.close()
    while True:
        try:
            # lock.acquire()
            food = con.recv()
            # lock.release()
            print("%s吃了%s" % (name, food))
            time.sleep(random.randint(1, 3))
        except EOFError:
            print("%s吃完啦" % name)
            con.close()
            break


if __name__ == '__main__':
    con, pro = Pipe()
    # lock = Lock()
    p = Process(target=producer, args=(con, pro, "猪狗", "泔水"))
    p.start()
    c = Process(target=consumer, args=(con, pro, "逗比"))
    c1 = Process(target=consumer, args=(con, pro, "坦克" ))
    c.start()
    c1.start()
    con.close()
    pro.close()

# Pipe 管道数据不安全(可能发生多进程抢一个数据)枷锁来控制抢资源对象   管道属于最底层的东西
# 队列是 管道+锁 所以比较安全,一般用队列
"""
    应该特别注意管道端点的正确管理问题,如果是生产者或者消费者中都没有使用管道的某个端点,就应该将它关闭
    这也说明为何在生产中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中
    的recv()操作阻塞。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常
    因此,在生产中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点
"""
管道消费者模型

二、进程之间的数据共享

模块:Manager

from multiprocessing import Manager, Process, Lock
def main(dic, lock): lock.acquire() dic["count"] -= 1 lock.release() print(dic) if __name__ == '__main__': lock = Lock() # 牺牲效率变成串行(同步),但是安全 m = Manager() # 可以把数据共用多个进程共享,但是会发生多个子进程抢同一个资源,造成数据混乱,使用必须加锁 dic = m.dict({"count": 100}) p_lis = [] for i in range(50): p = Process(target=main, args=(dic, lock)) p_lis.append(p) # 把对象添加到列表 p.start() for i in p_lis: i.join() # 把每个对象添加join,所有子进程结束,主进程才运行 print("主进程:", dic)

三、进程池

效率:
每次开启进程,开启属于这个进程的内存空间
寄存器 堆栈 文件
进程过多 操作系统的调度

进程池:
python中 先创建一个属于进程的池子
这个池子指定能存放n个进程
先将这些进程创建好,如果有n个任务,就让n个进程去执行,这样减少cpu的占用率,提高效率(防止一起去执行,占用资源过大)
1.Pool模块进程池
from multiprocessing import Pool, Process
import time


def fun(n):
    for i in range(10):
        print(n + 1)


if __name__ == '__main__':
    start1 = time.time()
    pool = Pool(5)  # 5个进程
    pool.map(fun, range(100))  # 100个任务 自带join 和close   map参数必须是可迭代
    t1 = time.time() - start1
    start2 = time.time()
    p_lis = []
    for i in range(100):
        p = Process(target=fun, args=(i,))
        p_lis.append(p)
        p.start()
    for i in p_lis: i.join()
    t2 = time.time() - start2
    print(t1, t2)  # 0.1535499095916748 2.3756473064422607

# 进程池:是同时并行执行5个程序,差不多执行20次  多进程:是100个子进程分别(异步)执行一次,100次
# 异步apply_async用法:如果使用异步提交的任务,主进程需要使用join,等待进程池内任务都处理完,然后可以用get收集结果
# 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了"""
from multiprocessing import Pool
import time, os


def func(n):
    print("start func %s" % n, os.getpid())
    time.sleep(1)
    print("end func%s" % n, os.getpid())


if __name__ == '__main__':
    pool = Pool(5)
    # pool.map(func,)
    for i in range(20):
        pool.apply_async(func, args=(i,))  # apply同步提交任务  apply_async异步提交任务
    time.sleep(5)
    # pool.close()  # 结束进程池接收任务    关闭进程池,防止进一步操作。如果所有操作持续阻塞,它们将在工作进程终止前完成
    # pool.join()  # 感知进程池中的任务执行结束
# 5个进程交替执行任务
# apply_async()需要先close 后join 来保持子进程和主进程代码的同步性

2.第二种方法:from concurrent.futures import ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import time
import os


def task(n):
    print(n, os.getppid())
    time.sleep(2)
    return n * 2


def call_back(n):
    print("拿到异步提交任务返回结果:", n.result())


if __name__ == '__main__':
    pool = ProcessPoolExecutor(5)
    t_lis = []
    for i in range(10):
        res = pool.submit(task, i).add_done_callback(call_back)  #异步多进程
        t_lis.append(res)
    pool.shutdown()  # 相当于p.join和p.close()  等待子进程结束
    print("主进程:", os.getpid())

三、进程池的返回值

第一种:

from multiprocessing import Pool
import time


def func(i):
    time.sleep(1)
    return i * i


if __name__ == '__main__':
    p = Pool(5)
    # res = p.map(func, range(10))   # 自带close和join 进程结束后,一次性打印
    # print(res)
    res_lis = []
    for i in range(10):
        res = p.apply_async(func, args=(i,))
        res_lis.append(res)
        # print(res.get())  # get()阻塞等待结果
    p.close()
    p.join()
      for i in res_lis:
    print(i.get())  # 拿到返回值
 

第二种:

from concurrent.futures import ProcessPoolExecutor
import time


def func(i):
    time.sleep(1)
    return i ** 2


if __name__ == '__main__':
    pool = ProcessPoolExecutor(5)
    res_list = []
    for i in range(10):
        res = pool.submit(func, i)
        res_list.append(res)
    pool.shutdown()  # 相当于close()和join()
    for i in res_list:
        print(i.result())  # 不同于上面是get()换成result()

*****模块不一样方式不同

四、回调函数

回调函数会在主函数中进行

第一种:

from concurrent.futures import ProcessPoolExecutor
import os


def func1(n):
    print("in func1 ", os.getpid())
    return n * n


def func2(nn):
    print("in func2 ", os.getpid())
    print(nn.result())

if __name__ == '__main__':
    pool =ProcessPoolExecutor(5)
    pool.submit(func1,10).add_done_callback(func2)
    pool.shutdown()  # 关机
    print("主进程:",os.getpid())

第二种:

from multiprocessing import Pool
import os


def func1(n):
    print("in func1", os.getpid())
    return n * n


def func2(nn):
    print("in func2", os.getpid())
    print(nn)


if __name__ == '__main__':
    p = Pool(5)
    # for i in range(10):
    p.apply_async(func1, args=(10,), callback=func2)
    p.close()
    p.join()
    print("主进程:", os.getpid())  # func2进程数和主进程一样,证明回调

五、进程池socket 通信高并发

#服务端
from multiprocessing import Pool
import os


def func1(n):
    print("in func1", os.getpid())
    return n * n


def func2(nn):
    print("in func2", os.getpid())
    print(nn)


if __name__ == '__main__':
    p = Pool(5)
    # for i in range(10):
    p.apply_async(func1, args=(10,), callback=func2)
    p.close()
    p.join()
    print("主进程:", os.getpid())  # func2进程数和主进程一样,证明回调
#客户端
import socket

client = socket.socket()
client.connect(("127.0.0.1", 8080))

ret = client.recv(1024).decode("utf8")
print(ret)
msg = input(">>>>>>>:").encode("utf8")
client.send(msg)

client.close()

六、利用多线程,爬虫例子

""" 装模块方法:cmd中: pip3 install 模块名"""
import requests
from multiprocessing import Pool


def get(url):
    response = requests.get(url)
    if response.status_code == 200:
        return url, response.content.decode("utf8")


def call_back(args):
    url, content = args
    print(url, len(content))


if __name__ == '__main__':
    url_lst = ["http://www.baidu.com/",
               "https://www.cnblogs.com",
               "https://www.sogou.com/"]
    p = Pool(5)
    for url in url_lst:
        p.apply_async(get, args=(url,),callback=call_back)
    p.close()
    p.join()
爬虫
 
原文地址:https://www.cnblogs.com/wukai66/p/11358991.html