Python并发编程之多进程(实战)

一、multiprocessing和Process

multiprocessing提供了支持子进程、通信和数据共享、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件

创建进程的类:

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:

1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务 
3 args表示调用对象的位置参数元组,args=(1,2,'egon',) 
4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} 
5 name为子进程的名称

方法介绍:

1 p.start():启动进程,并调用该子进程中的p.run() 
2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
4 p.is_alive():如果p仍然运行,返回True 
5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程  

属性介绍:

1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
2 p.name:进程的名称
3 p.pid:进程的pid 
4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) 
5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

二、Process类的使用

注意:在windows中Process()必须放到# if name == 'main':下,否则会启动无限启动的新进程

创建并开启进程的两种方式

# 方式一

from multiprocessing import Process
import time


def task(name):
    print('%s is running' % name)
    time.sleep(3)
    print('%s is done' % name)


if __name__ == '__main__':
    # Process(target=task, kwargs={'name':'子进程1'})
    p = Process(target=task, args=('子进程1',))
    p.start()  # 仅仅只是给操作系统发送了一个信号

    print('主进程')


# 方式二

from multiprocessing import Process
import time


class Myprocess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):  # 一定要叫这个名字,不能是别的
        print('%s is running' % self.name)
        time.sleep(3)
        print('%s is done' % self.name)


if __name__ == '__main__':
    p = Myprocess('子进程1')
    p.start()

    print('主进程')

进程之间的内存是隔离的

from multiprocessing import Process

n = 100  # 在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就可以了


def work():
    global n
    n = 0
    print('子进程内: ', n)  # 子进程内:  0


if __name__ == '__main__':
    p = Process(target=work)
    p.start()
    p.join()
    print('主进程内: ', n)  # 主进程内:  100

用并发实现socket通信

# server
from socket import *
from multiprocessing import Process

server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)


def talk(conn, client_addr):
    while True:
        try:
            msg = conn.recv(1024)
            if not msg: break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    while True:
        conn, addr = server.accept()
        p = Process(target=talk, args=(conn, addr,))
        p.start()

# client

from socket import *

client = socket(AF_INET,SOCK_STREAM)

client.connect(('127.0.0.1',8080))

while True:
    inp = input('>>>').strip()
    if not inp:break
    client.send(inp.encode('utf-8'))
    res = client.recv(1024).decode('utf-8')
    print(res)

这样每进来一个客户端就要打开一个进程,消耗太大,解决方案是进程池。

Process对象的Join方法

from multiprocessing import Process
import time
import os

# getpid获得当前进程id
# getppid获得当前进程的父进程id

def task():
    print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid()))
    time.sleep(3)
    print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid()))


if __name__ == '__main__':
    p = Process(target=task)
    p.start()

    #p.join()  # 这样不过去,主进程永远不会运行
    p.join(0.0001)  # 等待0.0001秒后就往下执行
    print('主进程', os.getpid(), os.getppid())
    print(p.pid)
    
    
 # join方法只是让主进程等,其他进程还是在运行的。实现的依然是并发效果。

from multiprocessing import Process
import time


def task(name, n):
    print('%s is running' % name)
    time.sleep(n)


if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=task, args=('子进程1', 5))
    p2 = Process(target=task, args=('子进程2', 3))
    p3 = Process(target=task, args=('子进程3', 2))

    p_l = [p1, p2, p3]
    for p in p_l:
        p.start()

    for p in p_l:
        p.join()

    print('主进程', time.time() - start) # 主进程 5.008474111557007  运行时间取决于运行时间最长的进程
    
# 改成串行
from multiprocessing import Process
import time

def task(name,n):
    print('%s is running' % name)
    time.sleep(n)


if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=task,args=('edward',3))
    p2 = Process(target=task,args=('egon',2))
    p3 = Process(target=task,args=('alex',5))


    p1.start()
    p1.join()

    p2.start()
    p2.join()

    p3.start()
    p3.join()
    
    print('主进程',time.time() - start)

is_alive、terminate和name方法

# is_alive
from multiprocessing import Process
import time
import os


def task():
    print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid()))
    time.sleep(3)
    print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid()))


if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    print(p.is_alive())  # True
    p.join()

    print('主进程', os.getpid(), os.getppid())
    print(p.pid)
    print(p.is_alive())  # False
    
 # terminate
from multiprocessing import Process
import time
import os


def task():
    print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid()))
    time.sleep(3)
    print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid()))


if __name__ == '__main__':
    # p = Process(target=task)
    # p.start()
    # p.terminate()  # 这一步也只是给操作系统发信号
    # print(p.is_alive())  # True
    #
    # print('主进程', os.getpid(), os.getppid())

    p = Process(target=task)
    p.start()
    p.terminate()  # 这一步也只是给操作系统发信号,操作系统需要反应一段时间
    time.sleep(3)
    print(p.is_alive())  # False

    print('主进程', os.getpid(), os.getppid())
    
# name
from multiprocessing import Process
import time
import os


def task():
    print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid()))
    time.sleep(3)
    print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid()))


if __name__ == '__main__':
    p = Process(target=task, name='sub-process')
    p.start()
    print('主进程', os.getpid(), os.getppid())
    print(p.name)  # sub-process

三、孤儿进程和僵尸进程

僵尸进程(有害):一个进程使用fork创建子进程,如果子进程退出,二父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵尸进程**

孤儿进程(无害):一个父进程退出,它的一个或多个子进程还在运行,这些进程就是孤儿进程。孤儿进程会统一被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作

僵尸进程会产生进程描述符,如果父进程不调用wait或waitpid的话,这些描述符就不会释放,而进程号就会被一直占用,可是一个系统的进程号又是有限的,如果大量产生僵尸进程,就会导致系统没有可用的系统号而无法产生新的进程。

孤儿进程没有危害,因为init会统一处理掉它们。

 

僵尸进程与孤儿进程测试

import os
import sys
import time

pid = os.getpid()
ppid = os.getppid()
print('im father', 'pid', pid, 'ppid', ppid
pid = os.fork()
#执行pid=os.fork()则会生成一个子进程
#返回值pid有两种值:
#    如果返回的pid值为0,表示在子进程当中
#    如果返回的pid值>0,表示在父进程当中
if pid > 0:
    print('father died..')
    sys.exit(0)

# 保证主线程退出完毕
time.sleep(1)
print('im child', os.getpid(), os.getppid())

执行文件,输出结果:
im father pid 32515 ppid 32015
father died..
im child 32516 1

看,子进程已经被pid为1的init进程接收了,所以僵尸进程在这种情况下是不存在的,存在只有孤儿进程而已,孤儿进程声明周期结束自然会被init来销毁。
 
# 僵尸进程测试
from multiprocessing import Process
import time,os

def run():
    print('',os.getpid())

if __name__ == '__main__':
    p=Process(target=run)
    p.start()
    
    print('',os.getpid())
    time.sleep(1000)
               
# 父进程结束后才会调用wait/waitpid去回收僵尸进程,如果父进程是一个死循环,就永远不会结束。

# 解决方法一:杀死父进程
# 解决方法二:对开启的子进程使用join,join会回收僵尸进程。即使回收后也能看到被回收子进程的pid,只是没有意义了。

四、守护进程

主进程创建一个子进程时,可以把该进程设置成自己的守护进程,只要主进程一结束,守护进程也跟着结束。

需要注意两点:

一:守护进程会在主进程代码执行结束后终止。

二:进程之间是互相独立的。

三:守护进程内无法再开启子进程,否则抛出异常。

应用场景:子进程的任务在主进程的任务结束后就没有存在的必要了。

from multiprocessing import Process
import time


def task(name):
    print('%s is talking' % name)
    time.sleep(2)
    p = Process(target=time.sleep, args=(3,))
    p.start()


if __name__ == '__main__':
    p = Process(target=task, args=('子进程1',))
    p.daemon = True  # 一定要在开始之前设置守护进程,否则会报错
    p.start()
    # p.daemon = True  AssertionError: process has already started

    # p.join()  # daemonic processes are not allowed to have children 守护进程里面不能在开子进程
    print('')  # 主进程运行完后,子进程(守护进程)就跟着一起死了,根本不会运行
    
 
from multiprocessing import Process

import time


def foo():
    print(123)
    time.sleep(1)
    print("end123")


def bar():
    print(456)
    time.sleep(3)
    print("end456")


if __name__ == '__main__':
    p1 = Process(target=foo)
    p2 = Process(target=bar)

    p1.daemon = True
    p1.start()  # 运行不起来,主进程运行完后就立马死了
    p2.start()
    print("main-------")

五、互斥锁

进程之间共享一套文件系统,访问同一个文件、终端会带来竞争,竞争会带来混乱,这就需要锁了,锁可以让进程之间有序,保证数据不乱。

from multiprocessing import Process, Lock
import time


def task(name,mutex):
    mutex.acquire()
    print('%s 1' % name)
    time.sleep(1)
    print('%s 2' % name)
    print('%s 3' % name)
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()  # 谁先抢到这把锁,谁先运行代码,直到抢到锁的进程运行完后,把锁释放掉,其他代码才能运行。这样牺牲了效率,但是保证了数据不错乱
    for i in range(3):
        p = Process(target=task, args=('进程%s' % i,mutex))
        p.start()

需要注意的是锁是局部串行

模拟抢票系统

# db {"count": 1}
import json
import time
from multiprocessing import Process, Lock


def search(name):
    time.sleep(1)
    dic = json.load(open('db', 'r', encoding='utf-8'))
    print('<%s>  查看到剩余票数[%s]' % (name, dic['count']))


def get(name):
    time.sleep(1)
    dic = json.load(open('db', 'r',encoding='utf-8'))
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(3)
        json.dump(dic, open('db', 'w', encoding='utf-8'))
        print('%s 购票成功' % name)


def task(name, mutex):
    search(name)  # 算数功能是并发的
    mutex.acquire()  # IO操作时串行的
    get(name)
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()

    for i in range(10):
        p = Process(target=task, args=('路人%s' % i, mutex))
        p.start()
        
        # 和join的区别
        # p.join()   如果加join的话,那么连search都串行了,而互斥锁可以把代码局部串行。

六、队列

加锁的问题是效率低、需要自己加锁处理,因此我们需要找一种效率高、帮我们处理好锁问题的方法。这就是队列和管道。

队列和管道可以把我们从复杂的锁问题中解脱出来,而队列又是基于(管道+锁)实现的,所以队列是进程间通信的最佳选择。

 

创建队列的类(底层就是以管道和锁定的方式实现):

Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 

方法介绍

1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
3 q.get_nowait():同q.get(False)
4 q.put_nowait():同q.put(False) 
5 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
6 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
7 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
    
# 其他方法
1 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
3 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为

队列的使用

from multiprocessing import Queue

q = Queue(3)
q.put({'a':1})
q.put('hello')

q.put([3,3,3])
print(q.full()) # True

# q.put(123) # 卡住

print(q.get())
print(q.get())
print(q.get())
print(q.empty())  #True

# q.get()  #  卡住

生产消费者模型

在并发编程中使用生产消费者模型能够解决大多数问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么使用生产者消费模型

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。如果生产者生产速度很快,消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据,反之亦然。生产者消费者模型就是解决这个问题的。

什么是生产者消费模型

生产者消费模型就是通过一个容器来解决生产者和消费者的强耦合问题。生产者把数据扔了一个阻塞列队,不用等待消费者;消费者不找生产者要数据,直接从阻塞列队取。

from multiprocessing import Process, Queue
import time


def producer(q):
    for i in range(1, 11):
        res = '包子%s' % i
        time.sleep(0.5)
        print('生产者生产了%s' % res)

        q.put(res)


def consumer(q):
    while True:
        res = q.get()
        if res is None: break
        time.sleep(1)
        print('消费者吃了%s' % res)


if __name__ == '__main__':
    # 容器
    q = Queue()

    # 生产者们
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))

    # 消费者们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    q.put(None)
    q.put(None)

    print('')

JoinableQueue([maxsize])

joinablequeue就是一个可以被join的queue。说白了就是允许消费者通知生产者项目已经被成功处理。

方法介绍

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
from multiprocessing import Process, JoinableQueue  # 和Queue用法一样,只是可以被join
import time


def producer(q):
    for i in range(1, 3):
        res = '包子%s' % i
        time.sleep(0.5)
        print('生产者生产了%s' % res)

        q.put(res)
    q.join()  # q.join()是等队列执行完,那队列什么时候执行完呢?取没了就执行完了,所以放到这。 这个是为了取代 put(None)这种写法


def consumer(q):
    while True:
        res = q.get()
        if res is None: break
        time.sleep(1)
        print('消费者吃了%s' % res)
        q.task_done()  # 发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕了


if __name__ == '__main__':
    # 容器
    q = JoinableQueue()

    # 生产者们
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))

    # 消费者们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True  # 如果不给消费者设置守护进程的话,while循环还在,程序就会卡在q.get()这,等着收参数,但是生产者已经发送完了
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    print('')  # 代码到这里后,生产者把东西put完了,消费者也get完了,消费者已经没有存在的意义了,所以给消费者设置了守护进程

 

 

原文地址:https://www.cnblogs.com/lshedward/p/10250284.html