001---进程

进程

什么是进程

进程:正在进行的一个过程或者一个任务,执行任务的是cpu。

进程与程序的区别

程序仅仅是一堆代码而已,而进程指的是程序的运行过程。
比喻:食谱就是程序,而进程就是做这道菜的过程:阅读食谱、放料、等。

并发与并行

无论是并发还是并行,在用户看起来都是同时运行的。不管是进程还是线程,都只是一个任务而已,而真正执行任务的是cpu。cpu同一时刻只能执行一个任务。

  • 并发(具备处理多个任务的能力):伪并行,看起来是同时。单个cpu + 多道技术就可以实现并发。
    你是一个cpu,你同时谈了三个女朋友,每一个都可以是一个恋爱任务,你被这三个任务共享。
    要玩出并发恋爱的效果,应该是你先跟女友1去看电影,看了一会说:不好,我要拉肚子,然后跑去跟第二个女友吃饭,吃了一会说:那啥,我
    去趟洗手间,然后跑去跟女友3开了个房
    
  • 并行(具备同时处理多个任务的能力):同时运行,只有具备多个cpu才能实现并行。

区别:一个是同一时间段、一个是同一时刻。

进程的状态

  • 就绪:开始一个新的任务

  • 运行:正在运行这个任务

  • 挂起:这个任务遇到IO就阻塞。然后让出cpu,给其他任务去执行。

开启进程的两种方式

注意:windows中Process()一定要在if name == 'main'下

  • 普通方式

    def func1(title):
        print(title)
    
    
    if __name__ == '__main__':
        p1 = Process(target=func1, args=('子进程1',))
        p2 = Process(target=func1, args=('子进程2',))
        p3 = Process(target=func1, args=('子进程3',))
        p1.start()
        p2.start()
        p3.start()
        print('主进程')
    
  • 类创建

    from multiprocessing import Process
    
    
    class MyProcess(Process):
    
        def __init__(self, title):
            super(MyProcess, self).__init__()
            self.title = title
    
        def run(self):
            print(self.title)
    
    
    if __name__ == '__main__':
        p1 = MyProcess('子进程1')
        p2 = MyProcess('子进程2')
        p3 = MyProcess('子进程3')
        p1.start()
        p2.start()
        p3.start()
        print('主进程')
    
    

进程的相关方法和属性

  • p.start():启动进程,调用子进程的run()

  • p.terminate():强制终止进程p,谨慎使用

  • p.is_alive():判断进程是否存活,布尔值

  • p.join():感知一个子进程的结束,将异步改为同步

    import time
    from multiprocessing import Process
    
    
    def func(arg1, arg2):
        print('*' * arg1)
        time.sleep(5)
        print('*' * arg2)
    
    
    if __name__ == '__main__':
        p = Process(target=func, args=(10, 20))
        p.start()
        print('开启子进程')
        p.join()  # 感知一个子进程的结束,将异步的程序改为同步
        print('子进程都运行完了')
    

    疑问:必须明确,join是让谁等,是让主进程等子进程p.start()后。子进程就已经并发执行了。当有多个join时,四个join花费的时间也是耗费时间最长的那个子进程

  • p.daemon = True:默认false,设为true

    import time
    from multiprocessing import Process
    
    
    def task(name):
        print('%s is piao ing' % name)
        time.sleep(3)
        print('%s is piao end' % name)
    
    
    if __name__ == '__main__':
        p = Process(target=task, args=('egon',))
        p.daemon = True # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
        p.start()
        time.sleep(1)   # 子进程只执行一会
        # time.sleep(5) # 子进程会执行完毕
        print('主')     # 只要终端打印出这一行内容,那么守护进程p也就跟着结束掉了
    

    代表p为守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

    比喻:皇帝身边的老太监:皇帝驾崩,老太监跟着死。

  • p.name:进程的名称

  • p.pid:进程的pid

进程间的数据隔离问题

import os, time
from multiprocessing import Process


def func():
    global n
    n = 0
    print('子--start:%s' % os.getpid(), n)


if __name__ == '__main__':
    n = 100
    print('主--start:%s' % os.getpid(), n)
    p = Process(target=func, )
    p.start()
    time.sleep(3)
    print('尽管使用了全局变量n, 但是不会修改n的值,依然是 % d' % n)
    print('主--end:%s' % os.getpid(), n)
    
"""
主--start:15704 100
子--start:7076 0
尽管使用了全局变量n, 但是不会修改n的值,依然是  100
主--end:15704 100
"""

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,
而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

同一终端打印

  • 未加锁

    # 并发运行,效率高,但竞争同一打印终端,带来了打印错乱
    from multiprocessing import Process
    import os, time
    
    
    def work():
    print('%s is running' % os.getpid())
    time.sleep(2)
    print('%s is done' % os.getpid())
    
    
    if __name__ == '__main__':
    for i in range(3):
    p = Process(target=work)
    p.start()
    
    """
    13924 is running
    13576 is running
    10080 is running
    13924 is done
    13576 is done
    10080 is done
    """
    
  • 加锁

  # 由并发变成了串行,牺牲了运行效率,但避免了竞争
  from multiprocessing import Process, Lock
  import os, time


  def work(lock):
  lock.acquire()
  print('%s is running' % os.getpid())
  time.sleep(2)
  print('%s is done' % os.getpid())
  lock.release()


  if __name__ == '__main__':
  lock = Lock()
  for i in range(3):
  p = Process(target=work, args=(lock,))
  p.start()

  """
  3964 is running
  3964 is done
  14000 is running
  14000 is done
  11592 is running
  11592 is done
  """

模拟抢票

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "ziya"
# Date: 2018-09-07

import json, time
from multiprocessing import Process
from multiprocessing import Lock


def show(i):
    with open('ticket') as f:
        ticket = json.load(f).get('ticket')
    print('余票为:%s' % ticket)


def buy(i, lock):
    # 自动加锁和解锁
    with lock:
        with open('ticket') as f:
            ticket = json.load(f).get('ticket')
        time.sleep(0.1)

        if ticket:
            ticket -= 1
            print('%s买到了票' % i)
        else:
            print('%s凉凉' % i)

        time.sleep(0.1)
        with open('ticket', 'w') as f:
            json.dump({"ticket": ticket, }, f)


if __name__ == '__main__':

    for i in range(10):
        p = Process(target=show, args=(i,))
        p.start()

    lock = Lock()
    for i in range(10):
        p = Process(target=buy, args=(i, lock))
        p.start()

总结:加锁能保证多个进程修改同一个数据时,同一时间只能有一个任务可以进行修改,虽然牺牲了效率,但保证了数据安全。

缺点

  • 效率低
  • 需要手动进行加锁处理

所以我们需要更好的方式来解决此问题:队列和管道。都是将数据存放于内存中。

队列queue(推荐使用)

def produce(q):
    q.put('hello')

def consume(q):
    print(q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target=produce,args=(q,))
    p.start()
    c = Process(target=consume,args=(q,))
    c.start()


"""
  q = Queue() # 省略则无大小限制,存放的不是数据大小。而且最大项数,但是也受限于内存大小。
  q.put():放,满了会阻塞
  q.get():拿:空了会阻塞
  q.get_nowait():当队列为空时,不会等待。直接跑出异常,可以捕捉。
  q.full():判断是否满了
  q.empty():判断是否空了
  
"""

生产者消费者模型

为什么要有这个模型

  • 平衡生产者和消费者之间的速度差
  • 程序之间解耦

JoinableQueue实现生产者消费者模型

from multiprocessing import Process, JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

        q.task_done() # 向q.join()发送一次信号,证明一个数据已经被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    q.join()  # 阻塞。感知一个队列中的数据,全部被执行完毕


if __name__ == '__main__':
    q=JoinableQueue()
    
    #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨头',q))
    p3=Process(target=producer,args=('泔水',q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True
    c2.daemon=True

    #开始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('主') 
    
    #主进程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    #因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程

进程池

为什么有进程池的概念

不可能开启无限进程,一般有几个核就开几个。否则开启过多进程会造成系统调度变慢、效率反而会下降所以创建一个属于子进程的池子,对进程数加以控制

from multiprocessing import Pool
p = Pool(5)  # 默认使用os.cpu_count(),一旦创建,至始至终就这5个进程交替,不会开启其他进程
p.close()    # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
p.join()     # 等待所有工作进程退出。此方法只能在close()或teminate()之后调用

方法

  • p.map(func, range(100)):自带join
  • p.apply_async(func, args, kwargs) :异步调用,p.close()、p.join()
  • p.apply(func, ):同步调用,直到本次任务拿到返回值,或者结束

返回值

  • p.apply():同步的,直接取得结果,就是func的返回值
  • p.apply_async(func,args,kwargs):异步的,需要p.get()

回调函数

回调函数:可以为每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发
并接收任务的返回值当作参数。

  • p.apply_async(get, args=(url,), callback=func)func是主进程执行的

多进程实现socket服务端的并发

服务端

import socket
from multiprocessing import Process

ip_port = ('127.0.0.1', 8001)
buffsize = 1024


def pro(conn):
    while True:
        try:
            data = '>>>: 你好'
            print('准备给客户端发送消息', data)
            conn.send(data.encode('utf-8'))
            msg = conn.recv(buffsize).decode('utf-8')
            print('收到客户端的消息', msg)
        except Exception as e:
            break
    conn.close()


if __name__ == '__main__':

    server = socket.socket()
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(ip_port)
    server.listen(5)
    while True:
        conn, addr = server.accept()
        print(conn, addr)
        p = Process(target=pro, args=(conn,))
        p.start()
    server.close()

客户端

import socket

ip_port = ('127.0.0.1', 8001)
buffsize = 1024

client = socket.socket()

client.connect(ip_port)
while 1:
    data = client.recv(buffsize).decode('utf-8')
    print('收到服务器发来的消息', data)
    msg = input('>>>:')
    if not msg: continue
    print('准备给服务器发送消息', msg)
    client.send(msg.encode('utf-8'))

client.close()

问题

每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。
解决方法:进程池

进程池实现socket服务端并发

服务端

from socket import *
from multiprocessing import Pool
import os


def talk(conn,client_addr):
    print('进程pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break


if __name__ == '__main__':
    server=socket(AF_INET,SOCK_STREAM)
    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    p=Pool()
    while True:
        conn,client_addr=server.accept()
        p.apply_async(talk,args=(conn,client_addr))
        # p.apply(talk,args=(conn,client_addr)) # 同步的话,则同一时间只有一个客户端能访问

客户端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
原文地址:https://www.cnblogs.com/xjmlove/p/10391713.html