EffectivePython并发及并行

并发(concurrency):计算机似乎是在同一时间做着很多不同的事;单核CPU,迅速切换,交错执行,无法提速。

并行(parallelism):计算机确实是在同一时间做着很多不同的事;多核CPU,提速。

第36条:用subprocess模块来管理子进程

Python中有许多种运行子进程的方式,如popen、popen2和os.exec*等。对于当今的Python来说,最好用且最简单的子进程管理模块,应该是内置的subprocess模块。用subprocess模块运行子进程,是比较简单的。subprocess模块让我们非常方便地启动一个子进程,并且控制其输入输出。

# 用Popen构造器来启动进程
proc = subprocess.Popen(
        ['echo', 'Hello from the child!'],  # shell命令,echo用于字符串的输出,echo string  
        stdout=subprocess.PIPE)
# 用communicate方法读取子进程的输出信息,并等待其终止
out, err = proc.communicate()  # 通信传输的是字节流
print(out.decode('utf-8'))    
def communicate(self, input=None, timeout=None):
"""Interact with process: Send data to stdin and close it. Read data from stdout and stderr, until end-of-file isreached.  Wait for process to terminate.
The optional "input" argument should be data to be sent to the child process, or None, if no data should be sent to the child.communicate() returns a tuple (stdout, stderr).
By default, all communication is in bytes, and therefore any "input" should be bytes, and the (stdout, stderr) will be bytes.If in text mode (indicated by self.text_mode), any "input" should be a string, and (stdout, stderr) will be strings decoded
according to locale encoding, or by "encoding" if set. Text mode is triggered by setting any of text, encoding, errors or universal_newlines."""

子进程将会独立于父进程而运行,这里的父进程,指的是Python解释器。在下面这个范例程序中,可以一边定期查询子进程的状态,一边处理其他事务。

 1 proc = subprocess.Popen(['sleep', '0.3'])  #  args --> [命令,参数]
 2 while proc.poll() is None:
 3     print('working...')
 4     # Some time-consuming work here
 5     # ...
 6 print('Exit status', proc.poll())
 7 
 8 >>>
 9 Working...
10 Working...
11 Exit status 0

把子进程从父进程中剥离(decouple,解耦),意味着父进程可以随意运行很多条平行的子进程。为了实现这一点,我们可以先把所有的子进程都启动起来。

 1 def run_sleep(period):
 2     proc = subprocess.Popen(['sleep', str(period)])
 3     return proc
 4 
 5 start = time()
 6 procs = []
 7 for _ in range(10):
 8     proc = run_sleep(0.1)
 9     procs.append(proc)
10 
11 # 通过communicate方法,等待这些子进程完成其I/O工作并终结
12 for proc in procs:
13     proc.communicate()
14 end = time()
15 print('Finished in %.3f seconds' % (end-start))
16 
17 >>>
18 Finished in 0.117 seconds
# 假如这些子进程逐个运行,而不是平行运行,那么总的延迟时间就会达到1秒钟,
# 而不会像本例这样,只用了0.1秒左右就运行完毕

开发者也可以从Python程序向子进程输送数据,然后获取子进程的输出信息。这使得我们可以利用其他程序来平行地执行任务。例如,要用命令行式的openssl工具加密一些数据。下面这段代码,能够以相关的命令行参数及I/O管道,轻松地创建出完成此功能所需的子进程。

def run_openssl(data):
    env = os.environ.copy()
    env['password'] = b'xe24U
xd0Q13Sx11'
    proc = subprocess.Popen(
          ['openssl', 'enc', '-des3', 'pass', 'env: password'],
          env=env,
          stdin=subprocess.PIPE,
          stdout=subprocess.PIPE
    )    
    proc.stdin.write(data)
    proc.stdin.flush() # Ensure the child gets input
    return proc

然后,把一些随机生成的字节数据,传给加密函数。在实际工作中,传入的应该是用户输入信息、文件句柄、网络套接字等内容。

procs = []
for _ in range(3):
    data = os.urandom(10)
    proc = run_openssl(data)
    procs.append(proc)

接下来,这些子进程就可以平行地运作并处理它们的输入信息了。此时,主程序可以等待这些子进程运行完毕,然后获取它们最终的输出结果:

for proc in procs:
    out, err = proc.communicate()
    print(out[-10:])

此外,我们还可以像UNIX管道那样,用平行的子进程来搭建平行的链条,所谓搭建链条,就是把第一个子进程的输出,与第二个子进程的输入联系起来,并以此方式继续拼接下去。

 1 def run_md5(input_stdin):
 2     proc = subprocess.Popen(
 3             ['md5'],
 4             stdin=input_stdin,
 5             stdout=subprocess.PIPE
 6     )
 7     return proc
 8 
 9 input_procs = []
10 hash_procs = []
11 for _ in range(3):
12     data = os.urandom(10)
13     proc = run_openssl(data)
14     input_procs.append(proc)
15     hash_proc = run_md5(proc.stdout)
16     hash_procs.append(hash_proc)
17 
18 for proc in input_procs:
19     proc.communicate()
20 for proc in hash_procs:
21     out, err = proc.communicate()
22     print(out.strip())

如果你担心子进程一直不终止,或担心它的输出管道及输出管道由于某些原因发生了阻塞,那么可以给communicate方法传入timeout参数。该子进程若在指定时间段内没有给出响应,communicate方法则会抛出异常,我们可以在处理异常的时候,终止出现意外的子进程。

proc = run_sleep(10)
try:
    proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
    proc.terminate()
    proc.wait()
print('Exit status', proc.poll())
原文地址:https://www.cnblogs.com/liushoudong/p/12349142.html