python3 分布式进程(跨机器)BaseManager(multiprocessing.managers)

A机器负责发送任务和接受结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#task_master.py
import random,time,queue
from multiprocessing.managers import BaseManager
 
task_queue = queue.Queue()
result_queue = queue.Queue()
 
class QueueManager(BaseManager):
    pass
 
if __name__ == '__main__':
    print("master start.")
    QueueManager.register('get_task_queue',callable = lambda:task_queue)
    QueueManager.register('get_result_queue',callable = lambda:result_queue)
    manager = QueueManager(address = ('10.10.100.11',9833),authkey=b'abc')
    manager.start()
    task = manager.get_task_queue()
    result = manager.get_result_queue()
 
    for in range(10):
        = random.randint(0,1000)
        print('put task %d ...' % n)
        task.put(n)
    print('try get results...')
 
    for in range(10):
        = result.get(timeout = 100)
        print('Result:%s' % r)
    manager.shutdown()
    print('master exit.')

B机器负责处理任务和发送结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#task_worker.py
import sys,time,queue
from multiprocessing.managers import BaseManager
 
class QueueManager(BaseManager):
    pass
 
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
 
server_addr = '10.10.100.11'
print('connect to server %s...' % server_addr)
 
= QueueManager(address=(server_addr,9833),authkey=b'abc')
m.connect()
 
task = m.get_task_queue()
result = m.get_result_queue()
 
for in range(10):
    try:
        = task.get(timeout = 10)
        print('run task %d * %d' %(n,n))
        = '%d * %d = %d' %(n,n,n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty')
 
print('worker exit'

原文地址:https://www.cnblogs.com/ExMan/p/10187599.html