Python process (进程)

进程 (process)

进程是对各种资源管理的集合,包含对各种资源的调用、内存的管理、网络接口的调用
进程要操作 CPU 必须先启动一个线程,启动一个进程的时候会自动创建一个线程,进程里的第一个线程就是主线程
程序执行的实例
有唯一的进程标识符(pid)

multiprossing 模块

启动进程

示例:

import multiprocessing
import time


def process_run(n):
    time.sleep(1)
    print('process', n)


for i in range(10):
    p = multiprocessing.Process(target=process_run, args=(i, ))
    p.start()

所有进程都是由父进程启动的

示例:

import multiprocessing
import os


def show_info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id', os.getpid())
    print('

')


def f(name):
    show_info('function f')
    print(name)


if __name__ == '__main__':
    show_info('main process line')
    p = multiprocessing.Process(target=f, args=('children process', ))
    p.start()

进程间通信

线程间共享内存空间,进程间只能通过其他方法进行通信

Queue

注意这个 Queue 不同于 queue.Queue
Queue type using a pipe, buffer and thread
两个进程的 Queue 并不是同一个,而是将数据 pickle 后传给另一个进程的 Queue
用于父进程与子进程之间的通信或同一父进程的子进程之间通信


示例:

from multiprocessing import Process, Queue


def p_put(*args):
    q.put(args)
    print('Has put %s' % args)


def p_get(*args):
    print('%s wait to get...' % args)
    print(q.get())
    print('%s got it' % args)


q = Queue()
p1 = Process(target=p_put, args=('p1', ))
p2 = Process(target=p_get, args=('p2', ))
p1.start()
p2.start()

输出结果:
Has put p1
p2 wait to get...
('p1',)
p2 got it


换成 queue 示例:

from multiprocessing import Process
import queue


def p_put(*args):
    q.put(args)
    print('Has put %s' % args)


def p_get(*args):
    print('%s wait to get...' % args)
    print(q.get())
    print('%s got it' % args)


q = queue.Queue()
p1 = Process(target=p_put, args=('p1', ))
p2 = Process(target=p_get, args=('p2', ))
p1.start()
p2.start()

输出结果:
Has put p1
p2 wait to get...


由于父进程启动子进程时是复制一份,所以每个子进程里也有一个空的队列,但是这些队列数据独立,所以 get 时会阻塞

Pipe

Pipe(管道) 是通过 socket 进行进程间通信的
所以步骤与建立 socket 连接相似:
建立连接、发送/接收数据(一端发送另一端不接受就会阻塞)、关闭连接
示例:

from multiprocessing import Pipe, Process


def f(conn):
    conn.send('send by child')
    print('child recv:', conn.recv())
    conn.close()


parent_conn, child_conn = Pipe()	# 获得 Pipe 连接的两端
p = Process(target=f, args=(child_conn, ))
p.start()
print('parent recv:', parent_conn.recv())
parent_conn.send('send by parent')
p.join()

输出结果:
parent recv: send by child
child recv: send by parent

进程间数据共享

Manager

Manager 实现的是进程间共享数据
支持的可共享数据类型:

list
dict
Value
Array
Namespace
Queue 				queue.Queue
JoinableQueue		queue.Queue
Event 				threading.Event
Lock 				threading.Lock
RLock 				threading.RLock
Semaphore 			threading.Semaphore
BoundedSemaphore 	threading.BoundedSemaphore
Condition 			threading.Condition
Barrier 			threading.Barrier
Pool 				pool.Pool

示例:

from multiprocessing import Manager, Process
import os


def func():
    m_dict['key'] = 'value'
    m_list.append(os.getpid())


manager = Manager()
m_dict = manager.dict()
m_list = manager.list()
p_list = []
for i in range(10):
    p = Process(target=func)
    p.start()
    p_list.append(p)
for p in p_list:
    p.join()
print(m_list)
print(m_dict)

进程锁

打印时可能会出错,加锁可以避免
示例:

from multiprocessing import Lock, Process


def foo(n, l):
    l.acquire()
    print('hello world', n)
    l.release()


lock = Lock()
for i in range(100):
    process = Process(target=foo, args=(i, lock))
    process.start()

进程池 (pool)

同一时间最多有几个进程在 CPU 上运行
示例:

from multiprocessing import Pool
import time
import os


def foo(n):
    time.sleep(1)
    print('In process', n, os.getpid())
    return n


def bar(*args):
    print('>>done: ', args, os.getpid())


pool = Pool(processes=3)
print('主进程: ', os.getpid())
for i in range(10):
    # pool.apply(func=foo, args=(i, ))
    pool.apply_async(func=foo, args=(i, ), callback=bar)
print('end')
pool.close()
pool.join()

从程序运行过程中可以看出:同一时间最多只有3个进程在运行,类似于线程中的信号量
主进程在执行 callback 函数

注意

1.
pool.apply(func=foo, args=(i, )) 是串行执行
pool.apply_async(func=foo, args=(i, ), callback=bar) 是并行执行
2.
callback 函数会以 target 函数返回结果为参数,在 target 函数执行结束之后执行
callback 函数是主进程调用的
3.
如果不执行 join,程序会在主进程执行完成之后直接结束,不会等待子进程执行完成
Pool.join() 必须在 Pool.close() 之后执行,否则会报错:ValueError: Pool is still running

原文地址:https://www.cnblogs.com/dbf-/p/11127000.html