分布式进程

分布式进程可以有multiprocessing模块的managers子模块支持,可以写一个服务进程作为调度者,将任务分布到其他多个进程中,依靠网络通信进行管理

taskManager.py

import random,time,queue
from multiprocessing.managers import BaseManager

# 第一步:建立task_queue和resul_queue,用来存放任务和结果
task_queue = queue.Queue()
result_queue = queue.Queue()

class Queuemanager(BaseManager):
    pass

#第二步:把创建的两个队列注册到网络上,利用register方法,callable参数关联了Queue对象
# 将queue对象在网络中暴露

Queuemanager.register('get_task_queue',callable=lambda:task_queue)

Queuemanager.register('get_result_queue',callable=lambda:result_queue)

# 第三步绑定端口,设置验证口令
manager = Queuemanager(address=('',8001),authkey='qiye'.endode('utf-8'))

# 第四步 启动管理,监听信息通道
manager.start()

# 第五步 通过管理实例的方法获取通过网络访问的Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()

#第六步 :添加任务
for url in ['ImageUrl_' + str(i) for i in range(10)]:
    print('put task %s' % url)
    task.put(url)

# 获取返回结果
print('try get result...')
for i in range(10):
    print('result is %s' % result.get(timeout=10))
    
# 关闭管理
manager.shutdown()

taskManager.py

import time
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager

class QueueManager(BaseManager):
    pass

# 第一步:使用QueueManager 注册用于获取Queue的方法名称
QueueManager.register('get_task_queue')

QueueManager.register('get_result_queue')

# 第二步:连接到服务器
server_addr = '127.0.0.1'
print('connect to server %s ..' % server_addr)

#端口和验证口令注意保持和服务进程完全一致
m = QueueManager(address=(server_addr,8001),authkey='qiye'.encode('utf-8'))

# 从网络连接
m.connect()

# 第三步:获取Queue对象
task = m.get_task_queue()
result = m.get_result_queue()

#第四步:从task队列获取任务,并把结果写入result队列
while (not task.empty()):
    image_url = task.get(True,timeout=5)
    print('run task download %s' % image_url)
    time.sleep(1)
    result.put('%s --->success' % image_url)

# 处理结束
print('worker exit.')

 

原文地址:https://www.cnblogs.com/Erick-L/p/7708781.html