进程编程

进程与程序的区别

程序只是一堆代码,进程指的是程序的运行过程

并发与并行的区别

比如一台计算机有4个CPU,然后分别运行了4个程序,这种叫做并行。

当一个CPU要运行4个程序,叫并发。

如何实现单核的并发效果:

例如:某人正在做饭,洗衣服,拖地,首先他要先烧水,在烧的过程中,他决定去洗衣服,把衣服放到洗衣机里面,打开开关后,水

还没有烧开于是他又去拖了个地,拖到半途中,水开了,他接着去做饭。。。。。用等待时间去干别的事情,是有某人决定,相对于

计算机来说,操作系统便实现协调管理、调度进程,并且将多个进程对硬件的竞争变得有序。

同步异步

同步和异步关注的是消息通信机制
所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。
换句话说,就是由调用者主动等待这个调用的结果。

而异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在*调用*发出后,*被调用者*通过状态、通知来通知调用者,或通过回调函数处理这个调用。

一、进程是如何创建的
1、系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的
进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程)
2、一个进程在运行过程中开启子进程(如nginx开启多进程,os.fork,subprocess.Popen等)
3、用户的交互式请求,而创建一个新进程(如双击QQ)
4、一个批处理作业的初始化(只在大型机的批处理系统中应用)
无论哪一种,进程的创建都是向操作系统发起的请求,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的。
 
二、进程的终止
1、正常退出(点击X)
2、出错退出(python a.py,a.py不存在)
3、严重错误(如引用不存在的内存)
4、被其他进程杀死
 

 python下如何创建进程

#方法一
from multiprocessing import Process
import time
def work(name):
    print('task <%s> is runing'%name)
    time.sleep(2)
    print('task <%s> is done'%name)

#由于产生的两个子进程共享终端,所以会发生争抢
if __name__ == '__main__':#创建进程必须在main中
    p1 = Process(target=work,args=('wate',))
    p2 = Process(target=work, kwargs={'name':'egon'},))
    p1.start()#开启进程
    p2.start()#开启进程
    print('')
#方法二
from  multiprocessing import Process
import time

class MyProcess(Process):#继承Process类
    def __init__(self,name):
        super().__init__()#继承父类的init方法
        self.name = name

    def run(self):#这种开启子进程的方法必须要有run方法
        print('task <%s> is runing' % self.name)
        time.sleep(2)
        print('task <%s> is done' % self.name)

if __name__ == '__main__':
    p1 = MyProcess('wate')
    p1.start()#执行MyProcess.run方法
    p2 = MyProcess('egon')
    p2.start()
    print('')

将无并发的套接字通过进程实现并发

#服务端
from socket import *
from multiprocessing import Process

s = socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)#端口复用
s.bind(('127.0.0.1',8080))
s.listen(5)


def walk(conn,addr):
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except Exception:
            break
    conn.close()

if __name__ == '__main__':
    while True:
        conn,addr = s.accept()
        #开启子进程,没接收到一个链接就交给子进程取循环收发消息
        p = Process(target=walk,args=(conn,addr))
        p.start()
    s.close()
#客户端
from  socket import *
c = socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8080))

while True:
    msg = input('>>>')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data = c.recv(1024)
    print(data.decode('utf-8'))

进程的join方法

import time
from multiprocessing import Process
def walk(name):
    print('task <%s> is runing'%name)
    time.sleep(2)
    print('task <%s> is done'%name)

if __name__ == '__main__':
    p1 = Process(target=walk,args=('egon',))
    p2 = Process(target=walk,args=('egon',))
    p3 = Process(target=walk,args=('egon',))
    p_l = [p1,p2,p3]
    for p in p_l:
        p.start()
    for p in p_l:
        p.join()#主进程等所有子进程运行结束后才会执行

    print('')

关于进程的其他方法

p1.terminate()#终止进程,不建议使用(当子进程又产生子进程时,执行这个会产生僵尸进程)

p1.is_alive()#进程是否存活

p1.pid #p1对象产生进程的pid,也可在子进程中使用os.getpid()查看子进程的pid,os.getppid()查看父id

守护进程 

1、守护进程会在主进程的代码执行结束后终止

2、守护进程中无法再产生子进程

from multiprocessing import Process
import time
def work(name):
    print('------>%s'%name)
    time.sleep(2)
    print('=====>%s'%name)
if __name__ == '__main__':
    p1 = Process(target=work,args=('egon',))
    p1.daemon = True#守护进程
    p1.start()
    print('')
#运行结果:主进程执行完成,守护子进程还未执行,便结束了

 同步锁

from multiprocessing import Process,Lock
import time
#运行效率使用并发,不加锁
#保证有序只能串行,加锁
def work(name,mutex):
    mutex.acquire()#加锁
    print('task <%s> is runing'%name)
    time.sleep(2)
    print('task <%s> is done'%name)
    mutex.release()#解锁


if __name__ == '__main__':#创建进程必须在main中
    mutex= Lock()
    p1 = Process(target=work,args=('wate',mutex))
    p2 = Process(target=work, args=('egon',mutex))
    p1.start()
    p2.start()
    print('')

模拟抢票

db.txt

{"count": 0}

购票.py

import json
import os
import time
from multiprocessing import Process,Lock

def search():
    dic = json.load(open('db.txt'))
    print('33[32m [%s]  看到剩余票数[%s]33[0m' %(os.getpid(),dic['count']))


def get_ticket():
    dic = json.load(open('db.txt'))
    time.sleep(0.5)#模拟网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.5)#模拟写数据库的网络延迟
        json.dump(dic,open('db.txt','w'))
        print('33[31m%s购票成功33[0m'%os.getpid())

def task(mutex):
    search()#查的时候可以并发查
    mutex.acquire()#加锁
    get_ticket()#买的时候只能一个人买
    mutex.release()#解锁

if __name__ == '__main__':
    mutex= Lock()
    for i in range(10):
        p = Process(target=task,args= (mutex,))
        p.start()

 实现进程之间的通信

由于进程之间是相互隔离的,所以需要借助别的介质来实现进程之间的交互

基于队列QUEUE实现(队列是管道加锁实现的)。

from multiprocessing import Queue
q = Queue(3)#创建一个队列,可以放3个元素
q.put('one')#可以放任何python类型
q.put('two')
q.put('three')
# q.put('four',block=True)#默认block = True,队列满时等待
#q.put_nowait('fourth')
print(q.get())
print(q.get())
print(q.get())
# print(q.get())#队列空时等待
print(q.get(timeout=3))#队列空时等待3秒

 基于队列实现生产者消费者模型

from multiprocessing import Process, JoinableQueue
import time, os


def producer(q, name):
    for i in range(3):
        time.sleep(1)
        res = '%s%s' % (name, i)
        q.put(res)
        print('33[45m<%s> 生产了 [%s]33[0m' % (os.getpid(), res))
    q.join()#等待q.task_done()取完后返回主程序

def consumer(q):
    while True:
        res = q.get()
        time.sleep(1.5)
        print('33[34m<%s> 吃了 [%s]33[0m' % (os.getpid(), res))
        q.task_done()#取完一次给生产者发一次信号

if __name__ == '__main__':
    q = JoinableQueue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=(q, '包子'))
    p2 = Process(target=producer, args=(q, '饺子'))
    p3 = Process(target=producer, args=(q, '馄饨'))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    c1.daemon=True#主程序运行完即生产者知道消费者结束
    c2.daemon=True
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    print('')

进程池

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:

  1. 很明显需要并发执行的任务通常要远大于核数
  2. 一个操作系统不可能无限开启进程,通常有几个核就开几个进程
  3. 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)

例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数...
ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

from multiprocessing import Pool
import os,time

def work(n):
    print('task <%s> is runing'%os.getpid())
    time.sleep(2)
    return n**2

if __name__ == '__main__':
    p = Pool(4)#,进程池产生4个进程执行10次任务
    res_l = []
    for i in range(10):
        res = p.apply(work,args=(i,))#同步,一个进程执行之后才会执行下一个
        print(res)

上面的方式由于是同步的串行的,所以一个进程执行之后才会执行下一个,不建议使用,下面介绍一种基于异步的进程池的方式。

from multiprocessing import Pool
import os,time

def work(n):
    print('task <%s> is runing'%os.getpid())
    time.sleep(2)
    return n**2

if __name__ == '__main__':
    p = Pool(4)#,进程池产生4个进程执行10次任务
    res_l = []
    for i in range(10):
        # res = p.apply(work,args=(i,))#同步,一个进程执行之后才会执行下一个
        # print(res)
        res = p.apply_async(work,args=(i,))#异步,只把值抛给work不管返回结果
    p.close()#不在给进程池添加新任务
    p.join()#等待所有进程执行完成
    for res in res_l:
        print(res.get())

异步处理,会将进程全部抛给执行的功能函数中,利用p.close(),p.join(),等待并发的任务执行之后获取结果。

回调函数

from multiprocessing import Pool
import os,time

def work(n):
    print('task <%s> is runing'%os.getpid())
    time.sleep(2)
    return n**2

def show(res):
    print(res)
if __name__ == '__main__':
    p = Pool(4)#,进程池产生4个进程执行10次任务
    for i in range(10):
        res = p.apply_async(work,args=(i,),callback=show)#异步,只把值抛给work不管返回结果
    p.close()#不在给进程池添加新任务
    p.join()#等待所有进程执行完成

回调函数:异步处理我们不必等到所有任务都执行结束在获取结果,回调函数会将结果传入回调函数中进行处理,即进程每执行完一条任务后,会将结果传入指定回调函数中,在进行处理,

对于执行时间不一致的任务,减少等待时间。

原文地址:https://www.cnblogs.com/kunixiwa/p/7440433.html