python并发_进程_multiprocessing

多进程基础, 主要是用了 multiprocessing模块 :

在一个python进程中开启子进程,start方法和并发效果。

import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    print('我是子进程')

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    time.sleep(1)
    print('执行主进程的内容了')

多进程与PID

# 多个进程可以同时执行  并发
# 我们现在写的一个py文件就自己一个进程  同步执行代码
# 并发效果:
    # 在我们自己的一个py文件里 启动多个进程
    # 多个进程之间 - 操作系统
# 如何在自己的py文件里 启动一个进程
import os
import time
from multiprocessing import Process
def func(args):
    print(args)
    time.sleep(1)
    print('子进程', os.getpid())
    print('子的父进程', os.getppid())
    print('hahaha')

if __name__ == '__main__': # windows下必须加这句
    print('父进程',os.getpid())
    p = Process(target=func,args=(555,))  # 注册  p 是一个进程对象  参数必须是元组
    p.start()                 # 启动一个子进程
    print('父父进程',os.getppid())  # 查看当前主进程的父过程  (当前pycharm的PID)

老师的:http://www.cnblogs.com/Eva-J/articles/8253549.html#_label2 

join 感知进程结束

import time
from multiprocessing import Process

def func(arg1,arg2):
    print('*'*arg1)
    time.sleep(3)
    print('*'*arg2)


if __name__ == '__main__':  # windows下必须加这句
    p = Process(target=func,args=(10,20))
    p.start()
    print('hhhhhhhh')
    p.join()  # 感知一个子进程的结束,将异步变为同步
    print('=========== 运行结束.')

多个子进程写入文件:

import time
import os
from multiprocessing import Process

# def func(arg1,arg2):
#     print(arg1)
#     time.sleep(2)
#     print('*'*arg2)
#
#
# if __name__ == '__main__':
#     p_list = []
#     for i in range(10):
#         p = Process(target=func,args=(10*i,6*i))
#         p_list.append(p)
#         p.start()
#         # p.join()
#     [p.join() for p in p_list]
#     print('over.')



def func(filename,content):
    with open(filename,'w') as f:
        f.write(str(content))


if __name__ == '__main__':   # windows下必须加这句
    p_list = []
    for i in range(1,6):   # 开5个子进程
        p = Process(target=func,args=('info%s' %i,i))
        p_list.append(p)
        p.start()

    [p.join() for p in p_list]
    # 最终每个进程写入一个文件
    print(i for i in os.walk(r'F:python_s9练习py3s9day36 process'))

OOP方式 multiprocessing

import time
import os
from multiprocessing import Process

# 自定义类,必须继承 Process
class MyProcess(Process):
    def run(self):   # 必须实现run方法,它是子进程中执行的代码
        print(self.pid)
        print(self.name)
        print(os.getpid())


if __name__ == '__main__':   # windows下必须加这句
    print('master:',os.getpid())
    p1 = MyProcess()
    p1.start()
    p2 = MyProcess()
    p2.start()
# 进程与进程之间 数据是完全隔离的。

import os
from multiprocessing import Process


def func():
    global n
    n =0
    print('pid: %s  %s' % (os.getpid(), n))


if __name__ == '__main__':
    n =100
    p = Process(target=func)
    p.start()
    p.join()
    print(os.getpid(),n)

守护进程:

# 子进程 -- > 守护进程
import time
from multiprocessing import Process

def func():
    while True:
        time.sleep(0.2)
        print('我还活着')

def func2():
    print('in func2 start')
    time.sleep(3)
    print('in func2 finished')

if __name__ == '__main__':
    p = Process(target=func)
    p.daemon = True   # 设置子进程为守护进程
    p.start()

    # i = 0
    # while i<5:
    #     print('我是socket server')
    #     time.sleep(1)
    #     i+=1

    p2 = Process(target=func2)
    p2.start()
    p2.terminate()     # 结束一个子进程
    time.sleep(1)
    print(p2.is_alive())  # 检验一个进程是否还活着
    print(p2.name)

# 守护进程 会 随着 主进程的代码执行完毕 而 结束
# 在主进程内结束一个子进程 p.terminate()
    #  结束一个进程不是在执行方法之后立即生效,需要一个操作系统响应的过程
# 检验一个进程是否活着的状态 p.is_alive()
# p.name p.pid 这个进程的名字和进程号

进程锁: lock.acquire()

# 进程锁  只在多进程时使用

# 火车票
import json
import time
from multiprocessing import Process
from multiprocessing import Lock


# def show(i):
#     with open('ticket') as f:
#         dic = json.load(f)
#     print('余票: %s'%dic['ticket'])

def buy_ticket(i,lock):
    lock.acquire()      # 进程锁
    with open('ticket') as f:
        dic = json.load(f)
        time.sleep(0.1)
    if dic['ticket'] > 0 :
        dic['ticket'] -= 1
        print('33[32m%s买到票了33[0m'%i)
    else:
        print('33[31m%s没买到票33[0m'%i)
    time.sleep(0.1)
    with open('ticket','w') as f:
        json.dump(dic,f)
    lock.release()      # 释放锁

if __name__ == '__main__':
    # for i in range(10):
    #     p = Process(target=show,args=(i,))
    #     p.start()
    lock = Lock()   # 实例化锁
    for i in range(10):
        p = Process(target=buy_ticket, args=(i,lock))
        p.start()

ticket文件中只有:

{"ticket": 2}

来自 http://www.cnblogs.com/Eva-J/articles/8253549.html#_label2

多进程复习:

# 多进程代码
# from multiprocessing import Process
# 方法
    # 进程对象.start()     开启一个子进程
    # 进程对象.join()      感知一个子进程的结束
    # 进程对象.terminate() 结束一个子进程
    # 进程对象.is_alive()  查看某个子进程是否还在运行
# 属性
    # 进程对象.name        进程名
    # 进程对象.pid         进程号
    # 进程对象.daemon      值为True的时候,表示新的子进程是一个守护进程
            # 守护进程 随着主进程代码的执行结束而结束
            # 一定在start之前设置


# from multiprocessing import Lock
# l = Lock()
# l.acquire()   # 拿钥匙
# 会造成数据不安全的操作
# l.release()   # 还钥匙

子进程不能 input

from multiprocessing import Process
def func():
    num = input('>>>')  # 子进程中不能 input 原因是主进程不能感知子进程的输入
    print(num)

if __name__ == '__main__':
    Process(target=func).start()

信号量:Semaphore

# 多进程中的组件
# ktv
# 4个
# 一套资源  同一时间 只能被n个人访问
# 某一段代码 同一时间 只能被n个进程执行

import time
import random
from multiprocessing import Process, Semaphore

# sem = Semaphore(4)
# sem.acquire()
# print('拿到第一把钥匙')
# sem.acquire()
# print('拿到第二把钥匙')
# sem.acquire()
# print('拿到第三把钥匙')
# sem.acquire()
# print('拿到第四把钥匙')
# sem.acquire()
# print('拿到第五把钥匙')


def ktv(i,sem):
    sem.acquire()
    print('%s 走进ktv' %i)
    time.sleep(random.randint(1,5))
    print('%s 离开' %i)
    sem.release()


if __name__ == '__main__':
    sem = Semaphore(4)  # 信号量4个
    for i in range(20):
        p = Process(target=ktv,args=(i,sem))
        p.start()

# 用锁的原理实现的,内置了一个计数器
# 在同一时间 只能有指定数量的进程执行某一段被控制住的代码

 事件:

# 通过一个信号 来控制 多个进程 同时 执行或者阻塞
# 事件
from multiprocessing import Event

# 一个信号可以使所有的进程都进入阻塞状态
# 也可以控制所有的进程解除阻塞
# 一个事件被创建之后,默认是阻塞状态

# e = Event()  # 创建了一个事件
# print(e.is_set())   # 查看一个事件的状态,默认被设置成阻塞
# e.set()      # 将这个事件的状态改为True
# print(e.is_set())
# e.wait()     # 是依据e.is_set()的值来决定是否阻塞的
# print(123456)
# e.clear()    # 将这个事件的状态改为False
# print(e.is_set())
# e.wait()     # 等待 事件的信号被变成True
# print('*'*10)

# set 和 clear
#  分别用来修改一个事件的状态 True或者False
# is_set 用来查看一个事件的状态
# wait 是依据事件的状态来决定自己是否在wait处阻塞
#  False阻塞 True不阻塞


# 红绿灯事件
import time
import random
from multiprocessing import Event,Process

def cars(e,i):
    if not e.is_set():
        print('car%i在等待'%i)
        e.wait()    # 阻塞 直到得到一个 事件状态变成 True 的信号
    print('33[0;32;40mcar%i通过33[0m' % i)

def light(e):
    while True:
        if e.is_set():
            e.clear()
            print('33[31m红灯亮了33[0m')
        else:
            e.set()
            print('33[32m绿灯亮了33[0m')
        time.sleep(2)

if __name__ == '__main__':
    e = Event()
    traffic = Process(target=light,args=(e,))
    traffic.start()
    for i in range(20):
        car = Process(target=cars, args=(e,i))
        car.start()
        time.sleep(random.random())

进程间通信——队列和管道(multiprocess.Queue、multiprocess.Pipe)

IPC(Inter-Process Communication) 进程间通信

队列

# 队列 先进先出
# IPC  # 进程间通信
# from multiprocessing import Queue
# q = Queue(5)
# q.put(1)
# q.put(2)
# q.put(3)
# q.put(4)
# q.put(5)
# print(q.full())   # 队列是否满了
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.empty())
# while True:
#     try:
#         q.get_nowait()
#     except:
#         print('队列已空')
#         time.sleep(0.5)
# for i in range(6):
#     q.put(i)

from multiprocessing import Queue,Process
def produce(q):
    q.put('hello')

def consume(q):
    print(q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target=produce,args=(q,))
    p.start()
    c = Process(target=consume, args=(q,))
    c.start()

# put  当队列满的时候阻塞等待队列有空位置
# get 当队列空的时候阻塞等待队列有数据
# full empty 这些判断不完全准确

 Kafka, rabbitMQ, memcache 等消息中间件本质上都是一种队列。

生产者 消费者 模型

# 队列
# 生产者消费者模型

# 生产者 进程
# 消费者 进程
import time
import random
from multiprocessing import Process,Queue
def consumer(q,name):
    while True:
        food = q.get()
        if food is None:
            print('%s获取到了一个空'%name)
            break
        print('33[31m%s消费了%s33[0m' % (name,food))
        time.sleep(random.randint(1,3))

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)

if __name__  == '__main__':
    q = Queue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('Bob','油条', q))
    c1 = Process(target=consumer, args=(q,'Celen'))
    c2 = Process(target=consumer, args=(q,'Jerry'))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    p1.join()   # 感知进程的结束
    p2.join()
    q.put(None)
    q.put(None)

JoinableQueue 更好的方式,创建生产者消费者模型:

import time
import random
from multiprocessing import Process,JoinableQueue

def consumer(q,name):
    while True:  # 取不到数据一直阻塞
        food = q.get()
        print('33[31m%s消费了%s33[0m' % (name,food))
        time.sleep(random.randint(1,3))
        q.task_done()     # count - 1

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)
    q.join()    # 阻塞  直到一个队列中的所有数据 全部被处理完毕, 进程才会结束
    # 只有 JoinableQueue 的 q 才能有此功能

if __name__  == '__main__':
    q = JoinableQueue(20)  # 使用更高级的方式代替Queue
    p1 = Process(target=producer,args=('生产部','PCB',q))
    p2 = Process(target=producer, args=('研发部','主机', q))
    c1 = Process(target=consumer, args=(q,'Jerry'))
    c2 = Process(target=consumer, args=(q,'Tom'))
    p1.start()
    p2.start()
    c1.daemon = True   # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()      # 感知一个进程的结束

### 执行过程:
#  在消费者这一端:
    # 每次获取一个数据
    # 处理一个数据
    # 发送一个记号 : 标志一个数据被处理成功

# 在生产者这一端:
    # 每一次生产一个数据,
    # 且每一次生产的数据都放在队列中
    # 在队列中刻上一个记号
    # 当生产者全部生产完毕之后,
    # join信号 : 已经停止生产数据了
                # 且要等待之前被刻上的记号都被消费完
                # 当数据都被处理完时,join阻塞结束

# consumer 中把所有的任务消耗完
# producer 端 的 join感知到,停止阻塞
# 所有的producer进程结束
# 主进程中的p.join结束
# 主进程中代码结束
# 守护进程(消费者的进程)结束
# get task_done
# put join

 管道:

from multiprocessing import Pipe,Process

## 例一:
# con1,con2 = Pipe() # 一个管道两个口
# con1.send('32222') # 管道的一头发送
# print(con2.recv()) # 另一头接收


## 例二:
# def demo(conn):
#     conn.send('are you ok?')
#
# if __name__ == '__main__':
#     conn1,conn2 = Pipe()
#     Process(target=demo,args=(conn1,)).start()
#     print('conn2 recv:', conn2.recv())  # 接收也不必写长度,不会黏包


## 例三:
# def demo(conn):
#     while 1:   # 一直接收
#         msg = conn.recv()
#         if msg is None:break
#         print(msg)
#
# if __name__ == '__main__':
#     conn1,conn2 = Pipe()
#     Process(target=demo,args=(conn1,)).start()
#     for i in range(20):
#         conn2.send('这是测试')  # 发送完会阻塞
#     conn2.send(None)    # 发送结束符号


## 例四:
def func(conn1,conn2):
    conn2.close()     # 子进程管道关闭不会影响主进程
    while True:
        try :
            msg = conn1.recv()
            print('conn1:', msg)
        except EOFError:  # 没数据可取时,主动抛出的异常
            conn1.close()
            break

if __name__ == '__main__':
    conn1, conn2 = Pipe()
    Process(target=func,args = (conn1,conn2)).start()
    conn1.close()    # 主进程的管道关闭不会影响子进程
    for i in range(20):
        conn2.send('吃了么')
    conn2.close()  # 发送完成后关闭

管道实现 生产者、消费者模型

import time
import random

# from multiprocessing import Pipe,Process
#
# def producer(con,pro,name,food):
#     con.close()
#     for i in range(4):
#         time.sleep(random.randint(1, 3))
#         f = '%s生产%s%s'%(name,food,i)
#         print(f)
#         pro.send(f)
#     pro.close()
#
# def consumer(con,pro,name):
#     pro.close()
#     while True:
#         try:
#             food = con.recv()
#             print('%s吃了%s' % (name, food))
#             time.sleep(random.randint(1,3))
#         except EOFError:
#             con.close()
#             break
#
# # 简单的一对一
# if __name__ == '__main__':
#     con,pro = Pipe()
#     p = Process(target=producer,args=(con,pro,'Jinhua','火腿'))
#     p.start()
#     c = Process(target=consumer, args=(con, pro, 'Lin'))
#     c.start()
#     con.close()
#     pro.close()


from multiprocessing import Pipe,Process,Lock
def producer(con,pro,name,food):
    con.close()
    for i in range(100):
        f = '%s生产%s%s'%(name,food,i)
        print(f)
        pro.send(f)
    pro.send(None)
    pro.send(None)
    pro.send(None)
    pro.close()

def consumer(con,pro,name,lock):
    pro.close()
    while True:
        lock.acquire()    # 加锁 避免多个进程拿同一个数据
        food = con.recv()
        lock.release()
        if food is None:
            con.close()
            break
        print('%s吃了%s' % (name, food))


if __name__ == '__main__':
    con,pro = Pipe()
    lock= Lock()
    p = Process(target=producer,args=(con,pro,'Jinhua','火腿'))
    c1 = Process(target=consumer, args=(con, pro, 'Lin',lock))
    c2 = Process(target=consumer, args=(con, pro, 'Wang',lock))
    c3 = Process(target=consumer, args=(con, pro, 'Zhao',lock))
    c1.start()
    c2.start()
    c3.start()
    p.start()
    con.close()
    pro.close()


# pipe 数据不安全性
# IPC
# 加锁来控制操作管道的行为 来避免多个进程之间争抢数据造成的数据不安全现象

# 队列 进程之间数据安全的
# 队列 = 管道 + 锁
# 所以,用队列比较方便,管道是更底层的,还需要自己加锁

进程间的数据共享  Manager

# from multiprocessing import Manager,Process

# def main(dic):
#     dic['count'] -= 1
#     print(dic)
#
# if __name__ == '__main__':
#     m = Manager()
#     dic=m.dict({'count':100})
#     p_lst = []
#     p = Process(target=main, args=(dic,))
#     p.start()
#     p.join()
#     print('主进程', dic)


from multiprocessing import Manager,Process,Lock

def main(dic,lock):
    lock.acquire()
    dic['count'] -= 1
    lock.release()


if __name__ == '__main__':
    m = Manager()   # 进程间的数据共享
    l = Lock()
    dic=m.dict({'count':100})
    p_lst = []
    for i in range(50):
        p = Process(target=main,args=(dic,l))
        p.start()
        p_lst.append(p)
    for i in p_lst: i.join()  # 等待子进程结束
    print('主进程',dic)

进程池

# 为什么会有进程池的概念
    # 效率
    # 每开启进程,开启属于这个进程的内存空间
    # 寄存器 堆栈 文件
    # 进程过多 操作系统的调度

# 进程池
    # python中的 先创建一个属于进程的池子
    # 这个池子指定能存放n个进程
    # 先将这些进程创建好  进程超过5个建议使用进程池

# 更高级的进程池 比如动态增加的 python中没有
    # n,m
    # 3   三个进程
    #     + 进程
    # 20  20个


# import time
# from multiprocessing import Pool,Process
# def func(n):
#     for i in range(5):
#         print(n+1)
#
# if __name__ == '__main__':
#     pool = Pool(5)              # 5个进程
#     pool.map(func,range(100))    # 100个任务


import time
from multiprocessing import Pool,Process
def func(n):
    for i in range(10):
        print(n+1)

if __name__ == '__main__':
    start = time.time()
    pool = Pool(5)               # 5个进程
    pool.map(func,range(100))    # 100个任务
    t1 = time.time() - start

    start = time.time()
    p_lst = []
    for i in range(100):
        p = Process(target=func,args=(i,))
        p_lst.append(p)
        p.start()
    for p in p_lst :p.join()
    t2 = time.time() - start
    print(t1,t2)

## 进程池的好处就是用少的进程干更多的活。一般创建CPU+1个进程

# import time
# from multiprocessing import Pool,Process
# def func(n):
#     for i in range(10):
#         print(n+1)
#
# def func2(n):
#     n[0]
#     n[1]
#     for i in range(10):
#         print(n+2)

# if __name__ == '__main__':
#     start = time.time()
#     pool = Pool(5)               # 5个进程
#     pool.map(func,range(100))    # 100个任务
#     pool.map(func2,[('alex',1),'egon']) # map第二个参数必须是可迭代, 异步且自带join方法
#     t1 = time.time() - start
#
#     start = time.time()
#     p_lst = []
#     for i in range(100):     # 起100个进程的时候,效率反而降低了。
#         p = Process(target=func,args=(i,))
#         p_lst.append(p)
#         p.start()
#     for p in p_lst :p.join()
#     t2 = time.time() - start
#     print(t1,t2)

进程池中的 apply_async 异步执行

import os
import time
from multiprocessing import Pool

def func(n):
    print('start func%s'%n,os.getpid())
    time.sleep(1)
    print('end func%s' % n,os.getpid())

if __name__ == '__main__':
    p = Pool(5)   # 一般起cpu +1 个进程
    for i in range(10):
        # p.apply(func,args=(i,))       # 同步方式执行
        p.apply_async(func,args=(i,))   # 异步
    p.close()  # 结束进程池接收任务
    p.join()   # 感知进程池中的任务执行结束

进程池的返回值 

# p = Pool()
# p.map(funcname,iterable)   默认异步的执行任务,且自带close和join
# p.apply    同步调用的
# p.apply_async 异步调用 和主进程完全异步 需要手动close 和 join


# from multiprocessing import Pool
#
# def func(i):
#     return i**2
#
# if __name__ == '__main__':
#     p = Pool(5)
#     for i in range(10):
#         res = p.apply(func,args=(i,))
#         print(res)     # apply的结果就是func的返回值


# import time
# from multiprocessing import Pool
#
# def func(i):
#     time.sleep(0.5)
#     return i*i
#
# if __name__ == '__main__':
#     p = Pool(5)
#     for i in range(10):
#         res = p.apply_async(func,args=(i,))   # apply的结果就是func的返回值
#         print(res.get())      # get 等着 func的计算结果 导致阻塞


# import time
# from multiprocessing import Pool
#
# def func(i):
#     time.sleep(0.5)
#     return i*i
#
# if __name__ == '__main__':
#     p = Pool(5)
#     res_l = []
#     for i in range(10):
#         res = p.apply_async(func,args=(i,))   # apply的结果就是func的返回值
#         res_l.append(res)
#     for res in res_l:print(res.get())    # 每组5个出现


import time
from multiprocessing import Pool
def func(i):
    time.sleep(0.5)
    return i*i

if __name__ == '__main__':
    p = Pool(5)
    ret = p.map(func,range(10))  # map 自带join 和 close
    print(ret)      # 得到list

进程池的回调函数

# 回调函数
# import os
# from multiprocessing import Pool
#
# def func1(n):
#     print('in func1',os.getpid())
#     return n*n
#
# def func2(nn):
#     print('in func2',os.getpid())
#     print('func2 res:', nn)
#
# if __name__ == '__main__':
#     print('主进程 :',os.getpid())
#     p = Pool(5)
#     p.apply_async(func1,args=(10,),callback=func2)  # 回到主进程中执行
#     p.close()
#     p.join()



import os
from multiprocessing import Pool

def func1(n):
    print('in func1',os.getpid())
    return n*n

def func2(nn):
    print('in func2',os.getpid())
    print('f2 结果:', nn)

if __name__ == '__main__':
    print('主进程 :',os.getpid())
    p = Pool(5)
    for i in range(10):  # 多个子进程
        p.apply_async(func1,args=(10,),callback=func2)
    p.close()
    p.join()
原文地址:https://www.cnblogs.com/frx9527/p/multiprocessing.html