Python----分布式进程使用(Queue和BaseManager使用)---(搬运)

分布式进程

需要模块

  • multiprocessing和queue模块
  • 使用BaseManager创建分布式管理器
  • 使用Queue创建队列,用于多个进程之间的通信

分布式进程原理

  • managers子模块支持把多个进程分布到多台机器上
  • 可以写一个服务进程作为调度者,将任务分布到其它多个进程中,然后通过网络通信进行管理
  • 比如爬取图片:一般一个进程负责抓取图片的地址,将地址放在Queue(容器)队列中
  • 另外一个进程负责从Queue队列中取出链接地址进行图片下载和存储到到本地
  • 上述爬取图片的过程就可以做成分布式,一台机器负责获取链接,另外一台机器负责下载存储
  • 上述问题核心:将Queue队列暴露到网络中,让其他机器可以访问

分布式进程实现步骤

  • 建立Queue队列,负责进程之间的通信,任务队列task_queue,结果队列result_queue
  • 把第一步中的两个队列在网络中注册,注册时候将队列重新命名
  • 创建一个Queuemanager(BaseManager)的实例manager,相当于一个服务器,给定IP地址、端口和验证码
  • 启动实例manager
  • 访问Queue对象,即创建网络中暴露重命名后的Queue实例
  • 创建任务到本地队列中,自动上传任务到网络队列中,分配给任务进程进行处理
  • 任务进程先从网络中任务队列中取出任务,然后执行,将执行结果放入到网络中的结果队列中
  • 服务进程从结果队列中取出结果,直到执行完所有任务和取出所有的结果,任务进程关闭,然后服务进行关闭

注意

  • 先创建服务进程,再创建任务进程
  • 执行时,先启动服务进程,在创建任务进程,启动任务进程不要超过服务进程取出结果的等待时间

分布式进程实例

  • 创建一个分布式进程,用来完成10次乘法任务

服务进程:

# 服务进程在windows系统和Linux系统上有所不同
# 创建一个分布式进程:包括服务进程和任务进程
# 多个进程之间的通信使用Queue
# 该代码为服务进程
# 注意,运行时先运行服务进程,再运行任务进程
# 任务执行循序:
# 服务进程和任务进行都创建了相同的两个队列,一个用来放任务,一个用来放结果
# 第一步:服务进程运行,比如将数字2放进任务队列,任务进程从任务队列中取出数字2
# 第二步:取出数字,执行任务,就是2*2=4, 任务执行完后,放进结果队列
# 第三步:服务进程从结果队列中,取出结果。
# 第四步:所有任务执行完毕,所有结果都已经取出,最终任务队列和结果队列都是空的了

# -*- coding:utf-8 -*-
import random, queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support

# 第一步:定义两个Queue队列,一个用于发送任务,一个接收结果
task_queue = queue.Queue()
result_queue = queue.Queue()
# 创建类似的QueueManager,继承BaseManager,用于后面创建管理器
class QueueManager(BaseManager):
    pass
# 定义两个函数,返回结果就是Queue队列
def return_task_queue():
    global task_queue # 定义成全局变量
    return task_queue # 返回发送任务的队列
def return_result_queue():
    global result_queue
    return result_queue # 返回接收结果的队列

# 第二步:把上面创建的两个队列注册在网络上,利用register方法
# callable参数关联了Queue对象,将Queue对象在网络中暴露
# 第一个参数是注册在网络上队列的名称
def test():
    QueueManager.register('get_task_queue', callable=return_task_queue)
    QueueManager.register('get_result_queue', callable=return_result_queue)

    # 第三步:绑定端口8001,设置验证口令,这个相当于对象的初始化
    # 绑定端口并填写验证口令,windows下需要填写IP地址,Linux下默认为本地,地址为空
    manager = QueueManager(address=('127.0.0.1', 8001), authkey=b'abc') # 口令必须写成类似b'abc'形式,只写'abc'运行错误

    # 第四步:启动管理器,启动Queue队列,监听信息通道
    manager.start()

    # 第五步:通过管理实例的方法获访问网络中的Queue对象
    # 即通过网络访问获取任务队列和结果队列,创建了两个Queue实例,
    task = manager.get_task_queue()
    result = manager.get_result_queue()
    # 第六步:添加任务,获取返回的结果
    # 将任务放到Queue队列中
    for i in range(10):
        n = random.randint(0, 10) # 返回0到10之间的随机数
        print("Put task %s ..." % n)
        task.put(n) # 将n放入到任务队列中
    # 从结果队列中取出结果
    print("Try get results...")
    for i in range(11): # 注意,这里结果队列中取结果设置为11次,总共只有10个任务和10个结果,第10次用量确认队列中是不是已经空了
        # 总共循环10次,上面放入了10个数字作为任务
        # 加载一个异常捕获
        try:
            r = result.get(timeout=5) # 每次等待5秒,取结果队列中的值
            print("Result: %s" % r)
        except queue.Empty:
            print("result queue is empty.")

    # 最后一定要关闭服务,不然会报管道未关闭的错误
    manager.shutdown()
    print("master exit.")

if __name__ == '__main__':
    # Windows下多进程可能出现问题,添加以下代码可以缓解
    freeze_support()
    print("Start!")
    # 运行服务进程
    test()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

任务进程

# coding: utf-8
# 定义具体的任务进程,具体的工作任务是什么

import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager,继承BaseManager,用于后面创建管理器
class QueueManager(BaseManager):
    pass

# 第一步:使用QueueManager注册用于获取Queue的方法名称
# 前面服务进程已经将队列名称暴露到网络中,
# 该任务进程注册时只需要提供名称即可,与服务进程中队列名称一致
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 第二步:连接到服务器,也就是运行服务进程代码的机器
server_addr = '127.0.0.1'
print("Connet to server %s..." % server_addr)
# 创建一个管理器实例,端口和验证口令保持与服务进程中完全一致
m = QueueManager(address=(server_addr, 8001), authkey=b'abc')
# 连接到网络服务器
m.connect()

# 第三步:从网络上获取Queue对象,并进行本地化,与服务进程是同一个队列
task = m.get_task_queue()
result = m.get_result_queue()

# 第四步:从task队列获取任务,并把结果写入到resul队列
for i in range(10):
    try:
        # 前面服务进程向task队列中放入了n,这里取出n
        # n和n相乘,并将相乘的算式和结果放入到result队列中去
        n = task.get(timeout=1) # 每次等待1秒后取出任务
        print("run task %d * %d..." % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except queue.Empty:
        print("task queue is empty.")

# 任务处理结束
print("worker exit.")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

启动分布式进程

  • 先运行服务进程,再运行任务进程

服务进程运行结果

Start!
Put task 2 ...
Put task 0 ...
Put task 0 ...
Put task 6 ...
Put task 6 ...
Put task 8 ...
Put task 8 ...
Put task 4 ...
Put task 1 ...
Put task 1 ...
Try get results...
Result: 2 * 2 = 4
Result: 0 * 0 = 0
Result: 0 * 0 = 0
Result: 6 * 6 = 36
Result: 6 * 6 = 36
Result: 8 * 8 = 64
Result: 8 * 8 = 64
Result: 4 * 4 = 16
Result: 1 * 1 = 1
Result: 1 * 1 = 1
result queue is empty.
master exit.

Process finished with exit code 0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

任务进程运行结果

Connet to server 127.0.0.1...
run task 2 * 2...
run task 0 * 0...
run task 0 * 0...
run task 6 * 6...
run task 6 * 6...
run task 8 * 8...
run task 8 * 8...
run task 4 * 4...
run task 1 * 1...
run task 1 * 1...
worker exit.

Process finished with exit code 0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

知识补充1

当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。然后,在另一台机器上启动任务进程(本机上启动也可以)

知识补充2

其中task_queue和result_queue是两个队列,分别存放任务和结果。它们用来进行进程间通信,交换对象。
因为是分布式的环境,放入queue中的数据需要等待Workers机器运算处理后再进行读取,
QueueManager.register(‘get_task_queue’, callable=return_task_queue)
QueueManager.register(‘get_result_queue’, callable=return_result_queue)
这样就需要对queue用QueueManager进行封装放到网络中,这是通过上面的2行代码来实现的。我们给return_task_queue的网络调用接口取了一个名get_task_queue,而return_result_queue的名字是get_result_queue,方便区分对哪个queue进行操作。task.put(n)即是对task_queue进行写入数据,相当于分配任务。而result.get()即是等待workers机器处理后返回的结果。

知识补充3

这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。

Queue对象存储在哪?注意到task_worker.py中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程中:
在这里插入图片描述
而Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue。task_worker这里的QueueManager注册的名字必须和task_manager中的一样。对比上面的例子,可以看出Queue对象从另一个进程通过网络传递了过来。只不过这里的传递和网络通信由QueueManager完成。

authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定连接不上。

原文链接:https://blog.csdn.net/u011318077/article/details/88094583

原文地址:https://www.cnblogs.com/eihouwang/p/14198730.html