parallel python多进程集群模式

parallel python作为轻量级的python分布式框架,为用python做简单的分布式计算提供了很大的方便,而且使用也简单。
主要分为单机模式和集群模式:

单机模式

单机模式就是本机上进行多进程,这与multiprocess的多进程类似,甚至表现不是那么好(主要是体现在速度上面,比multiprocess还是要慢一些),其代码如下:

import pp
import time

def time_delay(n):
	time.sleep(n)

def main_func(n):
	t_time = time.time()  # 使用了time模块,需要在submit里面添加次模块。如果不添加,也可以直接在函数里面直接导入模块。
	time_delay(n)  # 调用其他函数,需要在submit里面添加此函数
	a = 0
	for k in range(n+1):
		a += k
	print(f'job: {n} takes {time.time() - t_time}s')
	return a


if __name__ == "__main__":
	ppservers = ()  # 远程服务端ip和端口号,为空就是本机
	job_server = pp.Server(ncpus=5, ppservers=ppservers)  # ncpus:本机上运行进程数量
	# 生成[(n1, job1), (n2, job2),...] jobs列表
	jobs = [job_server.submit(main_func,  # 主函数
	                           (n,),  # 函数的参数
	                           depfuncs=(time_delay,),  # 函数内会调用到的其他函数,需要传入函数
	                           modules=("time", )  # 需要调用的模块,通过字符串类型传入。注意:从此处引入模块到 主函数中,外部导入的模块不能使用import...as,也不能用from ... import,只能用import。如果是在函数内部导入模块无所谓。
	                           )
	        for n in range(20)]

	# 遍历jobs,执行函数
	result_list = []  # 接收返回结果
	for job in jobs:
		rt = job()  # 执行函数,获取返回值。如果函数无返回值,无需用变量接收,直接执行函数即可。
		result_list.append(rt)

	job_server.destroy()  # 销毁进程
	print(result_list)

集群模式

在集群内多进程,与单机的区别是,单机只能在本机上进行多进程。而集群内多进程可以在集群内分配进程,可以有效避免某些机器超负荷运作,而有些机器在闲置的情况。

  • 与单机模式的区别是:集群模式需要先启动其他服务器作为客户端(想要调用哪些服务器,就要在里面启动),
  • 启动方法:
    - 找到ppserver.py文件。安装包的python2或python3下面有
    - 通过此命令,启动文件:python ppserver.py - p 3505 - w 10 - i 192.168.0.231 - s "123456"
    - 上面命令中:-p是自定义端口号,-w是服务端进程数量(可以与本机不同,不同节点上的都可以不同), -i是启动ppserver.py服务器的ip,-s是自定义密码
    - 查询pp进程:ps -ef | grep python | grep pp | grep user_name # user_name是登录服务器的账号名
  • 最后在本机中,运行以下代码,即可在本机和其他机器上同时进行多进程任务


import pp
import time

def time_delay(n):
	time.sleep(n)

def main_func(n):
	t_time = time.time()  # 使用了time模块,需要在submit里面添加次模块。如果不添加,也可以直接在函数里面直接导入模块。
	time_delay(n)  # 调用其他函数,需要在submit里面添加此函数
	a = 0
	for k in range(n+1):
		a += k
	print(f'job: {n} takes {time.time() - t_time}s')
	return a


if __name__ == "__main__":
	ppservers = ('192.168.0.231:3505',)  # 注意:这里需要填写远端服务器的ip和端口号
	job_server = pp.Server(ncpus=5, ppservers=ppservers, secret='123456')  # ncpus:本机进程数量,需要输入远端服务器启动ppserver时候的密码
	# 生成[(n1, job1), (n2, job2),...] jobs列表
	jobs = [job_server.submit(main_func,  # 主函数
	                           (n,),  # 函数的参数
	                           depfuncs=(time_delay,),  # 函数内会调用到的其他函数,需要传入函数
	                           modules=("time", )  # 需要调用的模块,通过字符串类型传入。注意:从此处引入模块到 主函数中,外部导入的模块不能使用import...as,也不能用from ... import,只能用import。如果是在函数内部导入模块无所谓。
	                           )
	        for n in range(20)]

	# 遍历jobs,执行函数
	result_list = []  # 接收返回结果
	for job in jobs:
		rt = job()  # 执行函数,获取返回值。如果函数无返回值,无需用变量接收,直接执行函数即可。
		result_list.append(rt)

	job_server.destroy()  # 销毁进程
	print(result_list)


#### 结果输出如下图所示:

原文地址:https://www.cnblogs.com/jaysonteng/p/13426781.html