W9_multiprocess_Process_Lock_Semaphore_Event_Queue_JoinableQueue

W9_multiprocess_Process_Lock_Semaphore_Event_Queue_JoinableQueue

多进程中的方法join

from multiprocessing import Process
import os

def test1(p1,p2):
    print("func test1")
    print("func test1 p:",p1, p2)
    print("func test1 pid:", os.getpid())
    print("func test1 ppid:", os.getppid())

if __name__ == "__main__":
    p = Process(target=test1, args = ("1","2"))
    p.start()
    print("*" * 10)
    print("main pid:", os.getpid())
    print("main ppid:", os.getppid())

进程生命周期

开启了子进程的主进程:
主进程自己的代码如果长,等待自己的代码结束
子进程的执行时间长,主进程会在主进程代码执行完毕之后等待子进程执行完毕之后,主进程才结束

返回顶部

join方法

等待子进程执行完成后再让主进程结束

from multiprocessing import Process
import os
import time

def test1(p1,p2):
    print("func test1")
    time.sleep(5)
    print("func test1 p:",p1, p2)


if __name__ == "__main__":
    p = Process(target=test1, args = ("1","2"))
    p.start()
    print("*" * 10)
    p.join()
    print("执行完了")

等待多个子进程执行完成后再让主进程结束

from multiprocessing import Process
import os
import time

def test1(p1,p2):
    print("func test1")
    print("*" * p1)
    time.sleep(5)
    print("*" * p2)


if __name__ == "__main__":
    p_list = []
    for i in range(10):
        p = Process(target=test1, args = (5*i, 10*i))
        p_list.append(p)
        p.start()

    [p.join() for p in p_list]
    print("执行完了")

继承Process类的启动方式


from multiprocessing import Process

class MyProcess(Process):   #定义一个类,继承Process
    def __init__(self,args1,args2):
        super().__init__()
        self.args1 = args1
        self.args2 = args2

    def run(self):   #重写run方法,在底层的start方法中调用run方法
        print(self.pid)
        print(self.name)
        print(self.args1)
        print(self.args2)

if __name__ == "__main__":
    p1 = MyProcess("test_p1", "test_p2")
    p1.start()
    p2 = MyProcess("test_p1_2", "test_p2_2")
    p2.start()

返回顶部

使用多进程实现socket服务端的并发效果

server:


import socket
from multiprocessing import Process


def func_p(conn, add, msg="hello"):
    data = conn.recv(1024)
    print(data.decode("utf-8"))
    msg = msg + "your addr:" + str(add[0]) + ":" + str(add[1])
    conn.send(msg.encode("utf-8"))
    conn.close()


sk = socket.socket()
sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk.bind(("127.0.0.1", 8080))
sk.listen(5)
while True:
    print("awaiting client connection")
    conn, addr = sk.accept()
    p = Process(target=func_p, args=(conn, addr))
    p.start()
    print("replying", addr)

sk.close()

client:

import socket

sk = socket.socket()
sk.connect(("127.0.0.1", 8080))
msg = input(">>>").strip()
# sk.send(b"hello, from client1")
sk.send(msg.encode("utf-8"))
data = sk.recv(1024)
print(data.decode("utf-8"))
sk.close()


返回顶部

守护进程和几个常用方法

守护进程 会 随着 主进程的代码执行完毕 而 结束
在主进程内结束一个子进程 p.terminate()
结束一个进程不是在执行方法之后立即生效,需要一个操作系统响应的过程
检验一个进程是否活着的状态 p.is_alive()
p.name p.pid 这个进程的名字和进程号

import time
from multiprocessing import Process

def func():
    while True:
        time.sleep(0.2)
        print('我还活着')

def func2():
    print('in func2 start')
    time.sleep(8)
    print('in func2 finished')

if __name__ == '__main__':
    p = Process(target=func)
    p.daemon = True   # 设置子进程为守护进程
    p.start()
    p2 = Process(target=func2)
    p2.start()
    p2.terminate()     # 结束一个子进程
    time.sleep(1)
    print(p2.is_alive())  # 检验一个进程是否还活着
    print(p2.name)

返回顶部

进程锁

lock = Lock() #实例化锁对象
lock.acquire() #拿钥匙进门
lock.release() # 还钥匙

import json
import time
from multiprocessing import Process,Lock


def show_tickets(user_id):
    with open("ticket", "r") as f:
        ticket = json.load(f)
        '''
        {"ticket": 1}
        '''
        print(str(user_id) + ":", ticket)


def buy_ticket(user,lock):
    lock.acquire()   #拿钥匙进门
    #查询余票
    with open("ticket", "r") as f:
        data = json.load(f)
    time.sleep(0.01)
    print("余票数:%s" % data["ticket"])
    #开始买票
    if data["ticket"] > 0:
        data["ticket"] -= 1
        print("33[32m %s got ticket 33[0m" % user)
    else:
        print("33[31m %s didn't get any ticket ! 33[0m" % user)
    time.sleep(0.1)
    # 并买票结果回写到数据库
    with open("ticket", "w") as f:
        json.dump(data,f)
    lock.release() # 还钥匙

def main():
    for user_id in range(10):
        p = Process(target=show_tickets, args=(user_id,))
        p.start()
    lock = Lock()
    for user_id in range(10):

        p = Process(target=buy_ticket, args=(user_id,lock))
        p.start()



if __name__ == '__main__':
    main()

返回顶部

Semaphore信号量

对于一个有限资源的,同一时间,只能被N个人访问
同一段代码 ,同一时间,只能被N个进程访问

from multiprocessing import Process,Semaphore
import time
import random

def ktv(i,sem):
    sem.acquire()
    print("%s in ktv" % i)
    time.sleep(random.randrange(2,10))
    print("%s leave ktv" % i)
    sem.release()


def main():
    sem = Semaphore(4)
    for i in range(20):
        p = Process(target=ktv, args=(i,sem))
        p.start()

if __name__ == "__main__":
    main()

返回顶部

Event事件

Event属性介绍

from multiprocessing import Event

e = Event()  #初始化Event对像,默认为阻塞状态
e.set()      #设置Event对像为为True状态,即非阻塞状态
e.clear()    #设置Event对像为为False状态,即为阻塞状态
e.is_set()   #查看当前的状态为True还是False
e.wait()     #根据该对象当前被设置的状态,执行阻塞或非阻塞,True非阻塞,False阻塞
print("Finished")

Event应用之 红绿灯 事件

import time
from multiprocessing import Process, Event
import random

def lights(e):
    while True:
        if not e.is_set():
            e.set()
            print("33[42mgreen light on33[0m")
        else:
            e.clear()
            print("33[41mred light on33[0m")
        time.sleep(1)


def cars(i, e):
    if not e.is_set():
        print("33[31mcar %s is waiting33[0m" % i)
        e.wait()
    print("33[32mcar %s is passing 33[0m" % i)


def main():
    e = Event()
    l = Process(target=lights, args=(e,))
    l.daemon = True   # 设置子进程为守护进程,随主进程代码执行完毕后结束
    l.start()
    car_list = []
    for i in range(10):
        car = Process(target=cars, args=(i, e))
        car_list.append(car)
        car.start()
        time.sleep(random.random())

    [car.join() for car in car_list]



if __name__ == '__main__':
    main()

进程间的通信--队列和管道

IPC(Inter-Process Communitication):进程间的通信

队列

特点:先进,先出

from multiprocessing import Queue

q = Queue(3) #初始化队列
q.put("a")
q.put("b")
q.put("c")
print(q.full())  #队列是否满,返回True表示已满
#q.put(“d”)       #当队列满时,put方法被阻塞
print(q.get())
print(q.get())
print(q.get())
print(q.empty())  #检测队列是否为空,但是其它进程或线程正在往队列中添加数据,结果是不可靠的。也就是说在返回和使用结果之间,队列中可能加入新的数据
# print(q.get())  #当队列为空时,get方法被阻塞
# q.get_nowait()  #非阻塞式取队列值,但是若队列为空,则报异常

进程间通信示例

from multiprocessing import Queue, Process

def producer(q):
    i = 0
    while i < 5:
        q.put("Hello")
        i += 1
        time.sleep(0.5)


def consumer(q):
    i = 0
    while i < 5:
        print(q.get())
        i += 1
        time.sleep(0.5)

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p1.start()

    p2 = Process(target=consumer, args=(q,))
    p2.start()

生产者消费者模型

from multiprocessing import Process, Queue
import time
import random


def produer(name, q, production):
    for i in range(10):
        time.sleep(random.random())
        q.put(production + ":" + str(i))
        print("%s produce %s %d" % (name, production, i))


def consumer(name, q):
    while True:
        production = q.get()
        if production == None:
            print("%s got the %s,exit!" % (name, production))
            break
        print("%s consume the %s" % (name, production))
        time.sleep(random.random())


if __name__ == '__main__':
    q = Queue(3)
    p1 = Process(target=produer, args=("alex", q, "baozi"))
    p2 = Process(target=produer, args=("Eva", q, "suger"))
    c1 = Process(target=consumer, args=("CC", q))
    c2 = Process(target=consumer, args=("dd", q))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    q.put(None)
    q.put(None)

问题:
以上模型存在的问题是对消费者进程的处理不够灵活,无法准确感知队列数据是否被消费者处理完毕

引入JoinableQueue代替Queue
JoinableQueue除拥有Queue的put,get方法外,还有新增的方法:
.join()
.task_done

from multiprocessing import JoinableQueue

q = JoinableQueue() # 初始化队列
q.join()     # 阻塞,直到队列中的所有数据被处理完成后才解除阻塞
q.task_done()  #返回一个数据被处理完毕的信号

改进后:


from multiprocessing import Process, JoinableQueue
import time
import random


def produer(name, q, production):
    for i in range(10):
        time.sleep(random.random())
        q.put(production + ":" + str(i))
        print("%s produce %s %d" % (name, production, i))
    q.join()  #表示所有数据生产完成,进程阻塞在这里,直到队列中的taks_done信号的数量与队列存放过的数据量相等才解除阻塞


def consumer(name, q):
    while True:
        production = q.get()  #当拿不到数据时,进程阻塞于此
        print("%s consume the %s" % (name, production))
        time.sleep(random.random())
        q.task_done()  #每处理完一个数据,返回一个task_doen的信号给队列


if __name__ == '__main__':
    q = JoinableQueue(3)
    p1 = Process(target=produer, args=("alex", q, "baozi"))
    p2 = Process(target=produer, args=("Eva", q, "suger"))
    c1 = Process(target=consumer, args=("CC", q))
    c2 = Process(target=consumer, args=("dd", q))
    p1.start()
    p2.start()
    c1.daemon = True #设置为守护进程,虽然进程阻塞,但会随主进程的退出而结束
    c2.daemon = True #设置为守护进程,虽然进程阻塞,但会随主进程的退出而结束
    c1.start()
    c2.start()
    p1.join()  #感知子进程是否结束,若所有子进程结束,主进程才能结束
    p2.join()  #感知子进程是否结束,若所有子进程结束,主进程才能结束

返回顶部

总结

多进程的方法

start()  #启动子进程运行
join()   #感知一个子进程的结束
terminate()  #结束一个子进程
is_alive()   #判断子进程是否还在运行

多进程属性

name   #进程的名称
pid    #进程号
daemon  #当值为True时,表示新的子进程为一个守护进程,
        #守护进程随着主进程代码的执行结束而结束
        #一定要在start()之前设置

from multiprocessing import Lock
lock = Lock() #实例化锁对象
lock.acquire()  #拿钥匙
lock.release()  #还钥匙
原文地址:https://www.cnblogs.com/rootid/p/9642902.html