通过队列完成进程之间的通信

  

一、进程之间的通信方式

    1、socket(进程通信的网络模式)

  

   2、文件操作(本地通信模式,但是效率低,因为文件是在硬盘上操作的)

  3、对列(本地通信模式,数据先进先出)

  4、栈(本地通信模式,数据先进后出)

二、两种数据存储方案

    1、列队:数据先进先出

    2、栈:数据先进后出

    

三、对列的作用和特点

    1、解耦:

     说明:

    即关系比较紧密的两个程序修改了其中一个,另一个也必须要跟着修改,否则不能正常使用。

    所以对于程序而言,耦合性越低程序越好。

      2、解耦的使用场景:

    例如:使用多进程下载数据并处理。进程之间互不干扰。

    A进程负责下载数据。

    B进程负责处理数据。

  3、队列特点:数据先进先出。即先存入的数据会被先取出来。

四、列队的使用

  1、语法:

  

    from multiprocessing import Queue

 

    # 若不写数量则根据硬件情况默认最大数

    队列对象 = Queue(数量)  

   2、参数说明:

      放置数据:列表对象.put(数据)

      取数据:列表对象.get(数据), 如果数据已被取完则程序会一直等待。

      队列对象.get_nowait(): 当数据已经被取完时,则以异常方式告诉程序

      判断队列中的数据是否为满: 队列对象.full()

      判断队列中的数据是否为空:队列对象.empty()

 3、通过队列取数据

  

from multiprocessing import *

q = Queue(3)  # 创建队列实例,最大可以放3个数据

print(q.empty())  # 判断数据是否为空,值为True表示数据未放满

q.put("xixi")  # 放置数据
q.put("haha")
q.put([1,2,3])

# 判断数据是否放满了
print(q.full())  # 值为True表示满了

# 取出数据
print(q.get())
print(q.get())
print(q.get())
View Code

执行结果:

4、使用多进程在队列中存入和读出数据

from multiprocessing import *


def download_from_web(q):
    """模拟下载好的数据"""
    data = [1, 2, 3, 4, 5]

    # 向队列中写入数据
    for i in data:
        q.put(i)
    print("下载器已下完,并存放到队列当中")


def analysis_data(q):
    """模拟数据处理"""
    waitting_analysis_data = list()  # 用于存储队列中取到的数据

    while True:

        # 将队列中取到的数据存放到定义的列表中。
        data = q.get()
        waitting_analysis_data.append(data)
        print(data)
        # 判断队列为是否为空
        if q.empty():
            break

    print(f"查看取到的数据:{waitting_analysis_data}")


def main():
    # 创建一个队列对象
    q = Queue()

    # 创建多进程,并将队列对象作为参数传入
    p1 = Process(target=download_from_web(q))
    p1.daemon = True
    p2 = Process(target=analysis_data(q))

    p1.start()
    p2.start()


if __name__ == "__main__":
    main()
View Code

执行结果:

五、使用进程池

1、定义:进程池是一个特殊的容器,这个容器中可以容纳许多进程。

作用:

  重复利用进程池中的进程。

  减少进程的创建,提高运行效率

2、应用场景:

  任务数量不确定时使用进程池。

3、进程池注意点:进程池中事先创建的进程并不会立即使用,只有运行任务的时候才会使用。

4、基本语法:

  

  from multiprocessing import Pool

 

  进程池对象 = Pool(预定的进程数数量)

  进程池对象.apply_async(任务名, (参数1, 参数2, ......))

 

  # 关闭进程池

  进程池对象.close()

  

5、进程池的使用示例

from multiprocessing import Pool
from time import *
import os, random


def task(num):
    t_start = time()
    print(f"{num}号进程开始执行,进程号为:{os.getpid()}")
    sleep(random.random() * 2)  # 让进程随机睡秒数
    t_stop = time()
    print(f"{num}号进程执行完毕,耗时:{t_stop - t_start}秒")


def main():
    po = Pool(3)  # 创建进程池,设定预定进程数数量为3

    for i in range(1, 11):  # 创建10个进程
        po.apply_async(task, (i,))

    # 关闭进程池
    po.close()
    po.join()  # 等待进程池中的所有子进程执行完成,必须放在close语句之后


if __name__ == "__main__":
    main()
View Code

执行结果:

原文地址:https://www.cnblogs.com/yujiemeigui/p/14299289.html