进程

进程  进程即正在执行的一个过程。进程是对正在运行程序的一个抽象.

开启进程的方法:

import time
import os
from multiprocessing import Process

def func(args,args2):
    print(args,args2)
    print('',os.getpid())
    print('子的父',os.getppid())

if __name__=='__main__':
    p = Process(target=func,args=('参数1','参数2'))  #主进程
          # p是一个进程对象  还没启动
    p.start()  #创建一个子进程 和父进程异步进行
    print('*'*5)
    print('',os.getpid())
    print('父的父',os.getppid())

#进程的生命周期
    #开启子进程的主进程:
        #主进程的代码如果长 等待自己的代码执行结束
        #子进程始行时间长 主进程执行完毕后 等待子进行执行完毕后 主进程才结束
函数方法
 import os
# from multiprocessing import Process
#
# class Myprocess(Process):
#     def __init__(self,args,args2):
#         super().__init__()
#         self.args = args
#         self.args2 = args2
#     def run(self):
#         print(self.pid)
#
#
# if __name__=='__main__':
#     print('主:',os.getpid())
#     p1 = Myprocess(1,2)
#     p1.start()
#     p2 = Myprocess(2,3)
#     p2.start()
继承方法

开启多个子进程:

# import time
# from multiprocessing import Process

# def func(args,args2):
#     print('*'*args)
#     time.sleep(3)
#     print('*'*args2)
#
# if __name__=='__main__':
#     p_list = []
#     for i in range(1,10):
#         p = Process(target=func,args=(1*i,2*i))
#         p_list.append(p)
#         p.start()
#     [p.join() for p in p_list]
#     print('执行完了')
多进程用list

多进程中的几种方法:

join()方法  感知一个子进程的结束
p.daemon = True    在start()前插入代码 就是将该子进程设置为守护进程
守护进程在主进程代码读完后结束,此时主进程可能还没有结束 而是在等待别的子进程执行结束才结束

p.is_alive检验一个进程是否还活着
# import time
# from multiprocessing import Process
# def fun():
#     while 1:
#         print('活着')
#         time.sleep(0.1)
# def fun2():
#     print('fun2 start')
#     time.sleep(10)
#     print('fub2 end')
#
# if __name__=='__main__':
#     p = Process(target=fun)
#     p.daemon = True    # 设置子进程为守护进程
#     p.start()
#     p2 = Process(target=fun2)
#     p2.start()
#     print(p2.is_alive())  # is_alive检验一个进程是否活着
#     print(p2.name)
#     # i = 0
    # while i<5:
    #     print('socket server')
    #     time.sleep(1)
    #     i += 1
上面方法的使用

进程锁:

多个进程同时抢一个数据就会出现数据安全问题,此时需要用进程锁限制数据,同一时间只能被一个进程操作

归还数据之后才能被另外的进程操作。此时牺牲效率保证安全

import time
import json
from multiprocessing import Process
from multiprocessing import Lock

def show(i):
    with open('ticket') as f:
        dic = json.load(f)
    print('余票:%s'%dic['ticket'])

def buy_tk(i,lock):
    lock.acquire()
    with open('ticket') as f:
        dic = json.load(f)
        time.sleep(0.1)
    if  dic['ticket']>0:
        dic['ticket'] -= 1
        print('33[32m%s买到票了33[0m'%i)
    else:
        print('33[31m%s没买到票33[0m'%i)
    time.sleep(0.1)
    with open('ticket','w') as f:
        json.dump(dic,f)
    lock.release()

if __name__=='__main__':
    for i in range(10):
        p = Process(target=show,args=(i,))
        p.start()
    lock = Lock()
    p.join()
    for i in range(10):
        p = Process(target=buy_tk,args=(i,lock))
        p.start()
火车票问题

事件

一个信号是所有进程都进入阻塞状态,也可以控制解除阻塞

事件被创建 默认是阻塞状态

# from multiprocessing import Event,Process
# import time
# import random
# 
# def cars(i,e):
#     if not e.is_set():
#         print('33[31m%s等待33[0m'%i)
#         e.wait()
#     print('33[32m%s通过33[0m'%i)
# 
# 
# 
# def light(e):
#     while 1:
#         if e.is_set():
#             e.clear()
#             print('33[31m红灯亮了33[0m')
# 
#         else:
#             e.set()
#             print('33[32m绿灯亮了33[0m')
# 
#         time.sleep(2)
# 
# if __name__=='__main__':
#     e = Event()
#     p = Process(target=light,args=(e,))
#     p.start()
#     for i in range(20):
#         car = Process(target=cars,args=(i,e))
#         car.start()
#         time.sleep(random.random())
红绿灯问题

信息量:

同一时间只能n进程进行操作,房间上n把锁,只有等里面的出来,外面的才能再进去

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore

def func(i,sem):
    sem.acquire()
    print('%i走进ktv'%i)
    time.sleep(random.randint(2,5))
    print('%i走出ktv'%i)
    sem.release()

if __name__=='__main__':
    sem = Semaphore(4)        # 给房间上锁 每次只能进4个
    for i in range(20):
        Process(target=func,args=(i,sem)).start()

队列:

队列能实现进程之间的通信

# from multiprocessing import Queue
#
# q = Queue(5)
# q.put(1)                 #put方法 往队列里放
# q.put(2)
# q.put(3)
# q.put(4)
# q.put(5)
# # q.put(5)            #若队列已满 再放会阻塞
# print(q.full())          #判断队列是否满了
# q.get()               #get方法 从队列里取
# q.get()
# q.get()
# q.get()
# q.get()
# # q.get()            #若队列为空 再取就会阻塞
# print(q.empty())     # 队列是否为空
# try:
#     q.get_nowait()       # 有值就取 没有报错 不等待 不阻塞
# except:
#     print('已经没有值了')


# 用Queue实现进程之间通信  数据传递
from multiprocessing import Queue,Process
def prodect(q):
    q.put('hello')
def consume(q):
    print(q.get())

if __name__=='__main__':
    q = Queue()
    p = Process(target=prodect,args=(q,))
    p.start()
    c = Process(target=consume,args=(q,))
    c.start()
from multiprocessing import Queue,Process
import time
import random

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        fo = '%s%s'%(food,i)
        print('%s生产了%s'%(name,fo))
        q.put(fo)

def consumer(name,q):
    while 1:
        food = q.get()
        print('33[31m%s消费了%s33[0m'%(name,food))
        time.sleep(random.randint(1,3))


if __name__=='__main__':
    q = Queue()
    p = Process(target=producer,args=('dh','包子',q))
    p2 = Process(target=producer,args=('大黄','榴莲',q))
    c = Process(target=consumer,args=('小包',q))
    c2 = Process(target=consumer,args=('二黄',q))
    p.start()
    p2.start()
    c.start()
    c2.start()
    p.join()
    p2.join()
生产者消费者模型 queue

该方法最后会阻塞,进程不会结束

from multiprocessing import JoinableQueue,Process
import time
import random

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        fo = '%s%s'%(food,i)
        print('%s生产了%s'%(name,fo))
        q.put(fo)
    q.join()  #阻塞 直到队列中的所有数据全部被处理完毕

def consumer(name,q):
    while 1:
        food = q.get()
        print('33[31m%s消费了%s33[0m'%(name,food))
        time.sleep(random.randint(1,3))
        q.task_done()

if __name__=='__main__':
    q = JoinableQueue()
    p = Process(target=producer,args=('dh','包子',q))
    p2 = Process(target=producer,args=('大黄','榴莲',q))
    c = Process(target=consumer,args=('小包',q))
    c2 = Process(target=consumer,args=('二黄',q))
    p.start()
    p2.start()
    c.daemon = True
    c2.daemon = True
    c.start()
    c2.start()
    p.join()
    p2.join()
生产者消费者模型 joinablequeue

管道:

管道的返回值是两个管道口,对应输入与输出,可以实现进程之间的通信

若管道口关闭,仍从管道内取值,管道中没有值时就会抛出异常

#生产者消费者模型用管道实现

from multiprocessing import Pipe,Process,Lock
import time
import random

def producer(con,pro,name,food):
    con.close()
    for i in range(10):
        fo = '%s%s'%(food,i)
        print('%s生产了%s'%(name,fo))
        pro.send(fo)
        time.sleep(random.randint(1,3))
    pro.close()
def consumer(con,pro,name,lock):
    pro.close()
    while 1:
        try:
            lock.acquire()
            food = con.recv()
            lock.release()
            print('%s吃了%s'%(name,food))
            time.sleep(random.randint(1,3))
        except EOFError:
            con.close()
            break


if __name__=='__main__':
    con,pro = Pipe()
    lock = Lock()
    p = Process(target=producer,args=(con,pro,'dh','包子'))
    p.start()
    c = Process(target=consumer,args=(con,pro,'大黄',lock))
    c.start()
    # c2 = Process(target=consumer,args=(con,pro,'大黄2',lock))
    # c2.start()           
    con.close()
    pro.close()

当存在多个消费者时,就是出现多个消费者抢资源,从而数据不安全。

解决方法就是加锁

管道加锁就实现了队列。

进程中的数据共享

rom multiprocessing import Manager,Process,Lock

def main(dic,l):
l.acquire()
dic['con']-=1
l.release()


if __name__=='__main__':
m = Manager()
l = Lock()
dic = m.dict({'con':100}) # 数据共享
p_lst = []
for i in range(50):
p = Process(target=main,args=(dic,l))
p.start()
p_lst.append(p)
for i in p_lst:i.join()
print(dic)

正常情况下,执行完数据的结果应该是50,但实际上每次的结果可能都不同

说明数据共享的同时,有子进程同时拿到了数据,进行了重复的操作

数据共享是不安全的,解决方法是加锁

进程池:

每开启一个子进程,都要开启一个属于这个子进程内存空间

操作系统调度效率降低

进程池:在python中创建一个装进程的池子,指定池子中有一定数量的进程,这些进程被创建好,等待被使用

每次进入一定数量的进程,一个处理完话另一个进

#    进程池 map方法 与开启多进程时间上的比较
# from multiprocessing import Process,Pool
# import time
#
# def fun(n):
#     print(n+1)
#
# if __name__=='__main__':
#     st = time.time()
#     pool = Pool(5)       # 进程池 提升效率
#     pool.map(fun,range(100))
#     t1 = time.time()-st
#
#     enn = time.time()
#     p_lst = []
#     for i in range(100):
#         p = Process(target=fun,args=(i,))
#         p.start()
#         p_lst.append(p)
#     for p in p_lst:p.join()
#     t2 = time.time()-enn
#     print(t1,t2)

map方法中要放一个可迭代的

# from multiprocessing import Pool
# import time,os
# def fun(n):
#     print('%sstart'%n,os.getpid())
#     time.sleep(0.3)
#     print('%send'%n,os.getpid())
#
# if __name__=='__main__':
#     p = Pool(5)
#     for i in range(10):
#         p.apply_async(fun,args=(i,))   #异步接收
#     p.close()   # 结束进程池接受任务
#     p.join()    # 感知进程池中的任务结束

p.apply()为同步方法

进程池的返回值

# from multiprocessing import Pool
# import time
# def fun(i):
#     time.sleep(0.5)
#     return i*i
# if __name__=='__main__':
#     p = Pool(5)
    # for i in range(10):
        # res = p.apply(fun,args=(i,))     #同步 一个一个打印
        # print(res)

    # res_l = []
    # for i in range(10):
    #     res = p.apply_async(fun,args=(i,))
    #     res_l.append(res)
    # for res in res_l:           # get方法会阻塞 放在生成进程的外面
    #     print(res.get())          #异步方法 一次打印5个(进程池为5)

    # res = p.map(fun,range(10))
    # print(res)      # 一次打印所有[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
                    # 所有数据处理完 一块返回 结果是一个列表

map简单调用异步函数 apply_async异步实现更强大

进程池的回调函数

# from multiprocessing import Pool
# import os
# def func1(n):
#     print('in func1',os.getpid())
#     return n*n
# def func2(nn):
#     print(nn,os.getpid())
#
# if __name__=='__main__':
#     print(os.getpid())
#     p = Pool(5)
#     p.apply_async(func1,args=(10,),callback=func2)  #callback回调函数
#     p.close()           # 执行func1的结果作为func2的参数传进回调函数
#     p.join()            # 回调函数在主进程空间

第一个函数的返回值 作为回调函数的参数进行处理

回调函数是主进程中的操作

回调函数常用于爬虫,子进程爬取复杂的信息,交给主进程处理,避免在切换进程过程中的网络延时

最后,一个没反应的程序,不知道哪错了,学了后面的再回来找补。

import requests
from multiprocessing import Pool
from urllib.request import urlopen

# def get(url):
#     response = requests.get(url)
#     if response.status_code == 200:
#         return url,response.content.decode('utf-8')

def get_urllib(url):
    ret = urlopen(url)
    return ret.read().decode('utf-8')

def call_back(args):
    url,content = args
    print(url,len(content))

if __name__=='__main__':
    url_lst = [
        'https://www.baidu.com'
        'https://www.sohu.com'
        'https://www.sougou.com'
    ]
    p = Pool(5)
    for ult in url_lst:
        p.apply_async(get_urllib,args=(ult,),callback = call_back)
    p.close()
    p.join()
原文地址:https://www.cnblogs.com/mu-tang/p/14194256.html