python 分布式进程

# -*- coding:gbk -*-
#master.py
from multiprocessing.managers import BaseManager
import random,time,queue

#定义发送任务的队列:
task_queue = queue.Queue()
#定义接受任务的队列:
result_queue = queue.Queue()


def return_task_queue():
global task_queue
return task_queue


def return_result_queue():
global result_queue
return result_queue

class QueueManager(BaseManager):
pass
if __name__ == '__main__':
QueueManager.register('get_task_queue', callable=return_task_queue)
QueueManager.register('get_result_queue', callable=return_result_queue)
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
manager.start()
task = manager.get_task_queue()
result = manager.get_result_queue()
for i in range(10):
rand = random.randint(0,10000)
print('Put:%s' % rand)
task.put(rand)
print('Try get results...')
for i in range(10):
r = result.get(timeout =10)
print('result:%s' % r)
manager.shutdown()
print('master exit!')


# -*- coding:gbk -*-
#worker.py
from multiprocessing.managers import BaseManager
import random,time,queue
class QueueManager(BaseManager):
pass
address = '127.0.0.1'
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
maneger = QueueManager(address=(address,5000),authkey=b'abc')
maneger.connect()

task = maneger.get_task_queue()
result =maneger.get_result_queue()
for i in range(10):
# try:
n = task.get(timeout=1)
print('%d * %d = %d' % (n,n,n*n))
r = '%d * %d = %d' % (n, n, n * n)
time.sleep(1)
result.put(r)
# except:
# print('task queue is empty.')
print('exit worker!')
原文地址:https://www.cnblogs.com/wuchenggong/p/8859220.html