并发编程之多进程

一、进程介绍

'''
进程:
    进程就是一个正在运行的程序,换言之,进程指的是程序的运行过程。进程时正在运行程序的一个抽象
    
程序VS进程:
    程序:是一堆存放于硬盘上的静态文件
    进程:程序的运行的过程才是进程
串行:
    一个任务完完整整的运行完毕,再执行下一个任务,按次序依次执行
    仅存在一个运行上下文。(即一个调用栈,一个堆)
    判断题:
        串行看起来就是一个一个运行的的  对
        一个一个的运行就是串行         错   # 按次序依次运行才是串行
并发:
    CPU切换+保存状态,多个任务看起来是同时运行(不是真正的同时,而是看来是同时,因为cpu要在多个程序间切换)
    本质就是一个物理CPU(也可以是多个物理CPU)在若干道程序之间多路复用,并发性是对有限物理资源强制行使多用户共享以提高效率
并行:
    多个任务是真正意义上的同时运行,只有多核才可以实现并行
    
多道技术:
    1、产生背景: 针对单核,实现并发
        现在服务器都是多核,那么每个核都会利用多道技术
        有4个CPU,运行CPU1的某个程序遇到IO阻塞,会等到IO结束后再重新调度,会被调度到4个CPU中的任意一个,具体由操作系统调度算法决定
    2、空间上的复用:如内存中同时有多个程序
    3、时间上的复用:复用一个CPU时间片
        遇到IO阻塞切,占用CPU时间过长也会切,核心在于切之前将进程的状态保存下来,这样才能保证下次切回来时,能基于上次切走的位置继续运行
    两种情况下CPU会发生切换:
      一个任务占用CPU时间过长(没有遇到IO操作) 会降低效率
      一个任务在运行的过程中遇到IO操作 可以提升效率,
'''

二、同步/异步  and 阻塞/非阻塞

'''
同步
    是在发出一个功能调用时,在没有得到结果前,该调用就不会返回。按照这个定义,其实绝大多数函数都是同步调用的,我们在说同步、异步的时候,特指那些需要其他部件协作或者需要一定时间完成任务

异步
    异步的概念和同步相对。
    当一个异步功能调用发出后,调用者不能立刻得到结果。当该异步功能完成后,通过状态、通知或回调来通知调用者。
    如果异步功能用状态来通知,那么调用者就需要每隔一定时间检查一次,效率很低
    如果使用通知的方式,效率很高,因为异步功能几乎不需要做额外的操作,等着通知就行
阻塞
    阻塞是指调用结果返回之前,当前线程会被挂起(如遇到IO操作)。函数只有在得到结果之后才会将阻塞线程激活。
    有人也许会把阻塞和同步调用等同起来,实际上是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已
    举例:
        同步调用:一个程序有一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞(即便是被抢走CPU的执行权限,那也是处于就绪状态)
        阻塞调用:当socket工作在阻塞模式时,如果没有数据的情况下调用recv函数,则当前线程会被挂起,直到有数据为止

非阻塞
    非阻塞和阻塞概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程
'''

三、multiprocessing模块介绍

'''
python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
'''

四、Process类介绍

from multiprocessing import Process

Process(group=None, target=None, name=None, args=(), kwargs={})    # 通过Process类实例化得到对象,表示一个子进程中的任务(未启动)

'''
强调:
    需要使用关键字的方式指定参数
    args指定的为传给target函数的位置参数,是元组形式,必须要有逗号
参数介绍:
    group  参数未使用,值始终是None
    target 调用对象,即子进程要执行的任务
    name  子进程的名称
    args表示调用对象的位置参数元组,args=('linux','python',)
    kwargs表示调用对象的字典,kwargs={'os'='linux','version':7}
 
方法介绍:
p=Process(target=task,name='child')
    p.start() 启动子进程,并调用该子进程中的p.run()
    p.run()  进程启动时运行的方法,正是它去调用target指定的函数,自定义类中一定要实现该方法
    p.terminate() 强制终止进程p 不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况,如果p还保存了一个锁那么也将不会被释放,进而导致死锁
    p.is_alive() 如果p还在运行,则返回Ture
    p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间
    需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
    
属性介绍:
    p.daemon  默认为False,如果设置为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止并且设置为True后,p不能创建自己的新进程,必须在p.start()之前设置
    p.name    进程名称
    p.pid     进程号
    p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束
    p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功
'''

五、创建子进程

'''
在windows中Process()必须放到# if __name__ == '__main__':下  以下是官方解释
Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
This is the reason for hiding calls to Process() inside

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.
由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 
如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 
这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。
'''
#######################方式1#################################
from multiprocessing import Process
import time
import os


def task(name):
    print('%s is running' % name)
    time.sleep(2)
    print('%s is done' % name)


if __name__ == '__main__':
    p = Process(target=task, args=('child',))
    p.start()  # p.start()仅仅只是向操作系统发了一个创建子进程p的信号
    print('%s is %s' % (p.pid, p.name))
    print('init(%s) is done' % os.getpid())

'''
执行结果:
    8912 is Process-1       # 此时子进程p还在创建中
                            # 在UNIX中该系统调用是:fork,fork会创建一个与父进程一模一样的副本,二者有相同的存储映像、同样的环境字符串和同样的打开文件(在shell解释器进程中,执行一个命令就会创建一个子进程)
                            在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的创建,也负责把正确的程序装入新进程
    init(536) is done       # 到此时,子进程p还没创建出来,先运行了主进程的代码,主进程进入阻塞态
    child is running        # 子进程创建完成,运行子进程代码 
    child is done           # 子进程运行完毕,主进程退出
'''
from multiprocessing import Process
import time
import os


class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s is running' % self.name)
        time.sleep(2)
        print('%s is done' % self.name)


if __name__ == '__main__':
    p = MyProcess('child')
    p.start()  # p.start()仅仅只是向操作系统发了一个创建子进程p的信号
    print('%s is %s' % (p.pid, p.name))
    print('init(%s) is done' % os.getpid())
方式2
import socket
from multiprocessing import Process

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 9080))
server.listen(5)


def service(connect, client):
    while True:
        try:
            msg = connect.recv(1024)
            if not msg: break
            print('收到来自客户端(%s:%s)的消息: %s' % (client[0], client[1], msg))
            connect.send(msg.upper())
        except Exception:
            break


if __name__ == '__main__':
    while True:
        connect, client = server.accept()
        p = Process(target=service, args=(connect, client))
        p.start()
socket并发通信之服务端
#!/usr/bin/env python
# -*- coding: utf-8 -*-  
# author: Fred_li

import socket

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 9080))

while True:
    msg = input('请输入要发给服务端的消息: ').strip()
    if len(msg) == 0: continue
    client.send(msg.encode('utf8'))
    data = client.recv(1024)
    print('收到来自服务端的消息:%s' % data)
socket并发通信之客户端

六、join()方法

from multiprocessing import Process

import time

import os


def task(name):
    print('%s is running' % name)
    time.sleep(2)
    print('%s is done' % name)


if __name__ == '__main__':
    p = Process(target=task, args=('child',))
    p.start()
    p.join()  # 1、必须要在start()之后  2、让主进程等待子进程p执行完毕后,再执行主进程后续的代码
    print('%s(%s) is over' % (p.name, p.pid))
    print('init(%s) is over' % os.getpid())

'''
执行结果:
    child is running
    child is done
    Process-1(7420) is over
    init(9352) is over
'''
from multiprocessing import Process
import time
import os
import random


def task(name):
    print('%s is running' % name)
    time.sleep(random.randint(1,3))
    print('%s is done' % name)


if __name__ == '__main__':
    p1 = Process(target=task, name='child_process1', kwargs={'name': 'child_process1'})
    p2 = Process(target=task, name='child_process2', kwargs={'name': 'child_process2'})
    p3 = Process(target=task, name='child_process3', kwargs={'name': 'child_process3'})
    start=time.time()
    p1.start()
    p2.start()
    p3.start()
    p1.join()                # join()方法是让主进程等子进程执行完,多个子进程是并发执行
    p2.join()
    p3.join()
    stop=time.time()
    print('init(%s) is done' %os.getpid(),stop-start)

'''
执行结果:
    child_process1 is running
    child_process2 is running
    child_process3 is running
    child_process3 is done
    child_process1 is done
    child_process2 is done
    init(7308) is done 3.1593379974365234
'''
多个子进程+join

七、进程之间内存空间是隔离的

from multiprocessing import Process

name = 'fred'


def task():
    global name
    name = 'fred_li'
    print('%s is 子进程的值' % name)


if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.join()
    print('%s is 主进程的值' % name)

'''
执行结果:
    fred_li is 子进程的值
    fred is 主进程的值
    # 实验证明进程之间的内存空间是隔离的
'''

八、孤儿进程和僵尸进程

'''
在Linux/unix中,正常情况下,子进程是通过父进程创建的,子进程再创建新的进程,子进程的结束和父进程的运行是一个异步过程。
即父进程永远无法预测子进程什么时候结束,当一个进程完成它的工作终止之后,它的父进程需要调用wait()或者waitpid()系统调用取得子进程的终止状态

孤儿进程:
    一个父进程退出,而它的一个或多个子进程还在运行,那么这些子进程将成为孤儿进程。孤儿进程将被init进程(pid为1)所收养,并由init进程对它们的状态完成收集工作
    孤儿进程是没有父进程的进程,孤儿进程这个重任就落到了init进程身上,init进程就好像是一个民政局,专门负责处理孤儿进程的善后工作
    每当出现一个孤儿进程的时候,内核就把孤 儿进程的父进程设置为init,而init进程会循环地wait()它的已经退出的子进程
    这样,当一个孤儿进程凄凉地结束了其生命周期的时候,init进程就会代表党和政府出面处理它的一切善后工作。因此孤儿进程并不会有什么危害。

僵尸进程:
    一个进程使用fork创建子进程,如果子进程退出,而父进程没有调用wait()或者waitpid()获取子进程的状态信息,那么子进程的进程描述符仍然存在于系统中,这种进程称之为僵尸进程
    因此,UNⅨ提供了一种机制可以保证父进程可以在任意时刻获取子进程结束时的状态信息:
        1、在每个进程退出的时候,内核释放该进程所有的资源,包括打开的文件,占用的内存等。
            但是仍然为其保留一定的信息(包括进程号the process ID,退出状态the termination status of the process,运行时间the amount of CPU time taken by the process等)
        2、直到父进程通过wait / waitpid来取时才释放. 但这样就导致了问题,如果进程不调用wait / waitpid的话,那么保留的那段信息就不会释放,其进程号就会一直被占用
            但是系统所能使用的进程号是有限的,如果大量的产生僵死进程,将因为没有可用的进程号而导致系统不能产生新的进程. 此即为僵尸进程的危害,应当避免。

    任何一个子进程(init除外)在exit()之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。
    这是每个子进程在结束时都要经过的阶段。如果子进程在exit()之后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”。
    如果父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不经过僵尸状态。 
    如果父进程在子进程结束之前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。
'''

僵尸进程及孤儿进程详解,以及僵尸进程的处理方法

九、守护进程

'''
守护进程
    本质就是一个"子进程",会在主进程运行完毕(指的是主进程代码运行完毕)的情况下跟着一起结束

特点:
    是一个"子进程"
    守护进程内的任务会在父进程完毕的情况下直接死掉
    守护进程无法再开启子进程,负责抛出异常AssertionError: daemonic processes are not allowed to have children
'''

from multiprocessing import Process

import time
import os


def service(name):
    print('%s is running' % name)
    time.sleep(2)
    print('%s is done' % name)


if __name__ == '__main__':
    p = Process(target=service, args=('child1',))
    p.daemon = True            # p.daemon=True必须要在p.start()之前
    p.start()
    print('init(%s) is done' % os.getpid())

'''
执行结果:
   init(13532) is done     # 主进程已经运行到最后代码了,子进程p还没创建出来,所以跟着主进程一起终止 
'''

十、互斥锁

'''
互斥锁:
    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,
    而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理
    某个进程要共享数据时,先将其锁定,此时资源的状态为‘锁定’,其他进程不能更改;
    直到该进程释放资源,将资源的状态变成‘非锁定’,其他的进程才能再次锁定该资源。
互斥锁保证了每次只有一个进程进入写入操作,从而保证了多进程情况下数据的正确性。

互斥锁的原理:
    将多个进程对共享数据修改的操作由并发变成‘串行’(并不是真正的串行,因为它不是依次执行,谁先抢到锁,谁就拥有修改数据的权利),牺牲了效率保证了数据安全
'''
from multiprocessing import Process

import json
import time
import random


def search(i):
    with open('db.json', 'rt', encoding='utf8') as f:
        dic = json.load(f)
        print('路人%s 查看到的余票数为: %s ' % (i, dic['count']))


def buy_ticket(i):
    with open('db.json', 'rt', encoding='utf8') as f:
        dic = json.load(f)

    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(random.randint(1, 3))
        with open('db.json', 'wt', encoding='utf8') as f_w:
            json.dump(dic, f_w)
            print('路人%s 抢票成功' % i)
    else:
        print('路人%s 抢票失败' % i)


def task(i):
    search(i)
    buy_ticket(i)


if __name__ == '__main__':
    for i in range(1, 4):
        p = Process(target=task, args=(i,))
        p.start()

'''
执行结果:
    路人1 查看到的余票数为: 1 
    路人2 查看到的余票数为: 1 
    路人3 查看到的余票数为: 1 
    路人2 抢票成功
    路人1 抢票成功
    路人3 抢票成功  
# 我的天哪,多个子进程并发执行,造成数据错乱(1张票3个人竟然都抢成功了),此时就用到了互斥锁
'''
多个子进程并发执行导致数据错乱的情景

上面的情景加上互斥锁后,如下

from multiprocessing import Process, Lock

import json
import time
import random


def search(i):
    with open('db.json', 'rt', encoding='utf8') as f:
        dic = json.load(f)
        print('路人%s 查看到的余票数为: %s ' % (i, dic['count']))


def buy_ticket(i):
    with open('db.json', 'rt', encoding='utf8') as f:
        dic = json.load(f)

    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(random.randint(1, 3))
        with open('db.json', 'wt', encoding='utf8') as f_w:
            json.dump(dic, f_w)
            print('路人%s 抢票成功' % i)
    else:
        print('路人%s 抢票失败' % i)


def task(i, mutex):
    search(i)
    mutex.acquire()  # 锁定
    buy_ticket(i)
    mutex.release()  # 释放


if __name__ == '__main__':
    mutex = Lock()
    for i in range(1, 4):
        p = Process(target=task, args=(i, mutex))
        p.start()

'''
执行结果:
    路人3 查看到的余票数为: 1 
    路人2 查看到的余票数为: 1 
    路人1 查看到的余票数为: 1 
    路人3 抢票成功
    路人2 抢票失败
    路人1 抢票失败
'''

 十一、进程间通信

'''
进程间通信的方式
    1、管道(pipe)及有名管道(name pipe):管道可用于具有亲缘关系的父子进程间的通信,有名管道除了具有管道所具有的的功能外,它还允许无亲缘关系进程间通信
    2、信号(signal):信号是在软件层次上对中断机制的一种模拟。它是比较复杂的通信方式,用于通知进程有某些事件发生,一个进程收到一个信号与处理器收到一个中断请求效果上可以说一致的
    3、消息队列(message queue):消息队列是消息的链接表,它克服了以上两种通信方式中信号量有限的缺点,具有写权限的进程可以按照一定的规则想消息队列中添加新消息,对消息队列有读权限的进程可以从消息队列中读取消息
    4、共享内存(shared memory):可以说这是最有用的进程间通信方式。它使多个进程可以访问同一块内存空间,不同进程可以及时看到对方进程中对共享内存中数据的更新。这种方式需要依靠某种同步操作,如互斥锁和信号量等
    5、信号量(semaphore):主要作为进程之间及同一种进程的不同线程之间的同步和互斥手段
    6、套接字(socket):这是一种更为一般的进程间通信机制,它可以用于网络中不同机器之间的进程通信,应用非常广泛
'''
'''
进程彼此间是互相隔离的,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
队列Queue:
    底层就是以管道+锁的方式实现的
'''

from multiprocessing  import Queue,Process

# 创建队列的类
q=Queue(maxsize=5)  # 创建共享的进程队列,Queue是多进程的安全队列,可以使用Queue实现多进程之间的数据的传递
    # maxsize是队列中最大能够的接收的消息数 一个队列,不能无限制的存储。毕竟,内存是有限制的。

q.put()  # 用以插入数据到队列中,put方法还有两个可选的参数,blocked和timeout 如果block为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。
      # 如果超时,会抛出Queue.Full异常
# 如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
q.get() # 可以从队列读取并且删除一个元素。同样,get方法有两个可选参数,blocked和timeout,如果block为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。
# 如果blocked为False,有两种情况存在,
# 如果Queue有一个值可用,则理解返回改值
# 如果队列为空,则立即抛出Queue.Empty异常
q.get_nowait() # 等同于q.get(Flase) q.put_nowait() # 等同于q.put(False) q.empty() # 调用此方法时q为空则返回True,该结果不可靠。比如在返回True的过程中,如果队列又加入新项目 q.full() # 调用此方法时q已满则返回True,该结果不可靠。比如在返回True的过程中,如果队列的项目被取走 q.qsize() # 返回队列中目前项目的正确数量,该结果也不可靠,理由同q.empty()和q.full() # q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞 # q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。
# 如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
# q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用
# 调用q.cancel_join_thread方法可以禁止这种行为
from multiprocessing import Queue

if __name__ == '__main__':
    q=Queue(3)    # 创建队列
    q.put('消息1')
    q.put('消息2')
    q.put('消息3')
    print('消息已经满了,亲')

    # 从消息队列中读取数据,推荐两种方法: 1、捕获   2、判断

    try:
        q.put('消息4',block=True,timeout=2)         # 尝试写入消息4,如果满了,等待2秒抛出异常
    except:
        print('已经满了,现有消息%s条' %q.qsize())

    try:
        q.put_nowait('消息4')  # 尝试写入,如果满了立即抛出异常
    except:
        print('已经满了,现有消息%s条' % q.qsize())

    if not q.full():
        q.put('消息4')

    if not q.empty():
        for i in range(q.qsize()):
            print(q.get_nowait())
from multiprocessing import Process, Queue
import os


# 写入进程

def Write(q):
    print('%s开始写入' % os.getpid())
    for i in range(0, 5):
        # 将消息写入队列
        q.put('消息%s' % i)
        print('消息%s' % i)


def Read(q):
    print('%s开始读取' % os.getpid())
    while True:
        if not q.empty():
            # 从队列中读取消息
            print('read to %s' % q.get())
        else:
            break


if __name__ == '__main__':
    q = Queue()
    # 创建写进程
    wp = Process(target=Write, args=(q,))
    wp.start()
    wp.join()

    rp = Process(target=Read, args=(q,))
    rp.start()

'''
执行结果:
    14124开始写入
    消息0
    消息1
    消息2
    消息3
    消息4
    1000开始读取
    read to 消息0
    read to 消息1
    read to 消息2
    read to 消息3
    read to 消息4
    
'''
多进程通信Queue应用

 十二、生产者消费者模型

'''
生产者消费者模型
    在并发编程中使用生产者和消费者模式能够解决大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度

什么是生产者消费者模式
    通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯
    所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
    
为什么要使用生产者和消费者模式
    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢
    那么生产者必须要等消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者
    为了解决这个问题于是引入了生产者和消费者模式
'''
from multiprocessing import Process, Queue
import time, random, os, subprocess


def producer(q):
    for i in range(3):
        time.sleep(random.randint(1, 3))
        command = subprocess.Popen('netstat -an |findstr 16422', shell=True, stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE)
        stdout = '[%s]%s' % (i, command.stdout.read().decode('gbk'))
        q.put(stdout)
        print('生产者(%s)第%s次产生数据:%s' % (os.getpid(), i, stdout), end='')


def consumer(q):
    while True:
        result = q.get()
        time.sleep(random.randint(1, 3))
        print('33[1;31m消费者(%s)拿走数据:%s33[0m' % (os.getpid(), result,), end='')


if __name__ == '__main__':
    q = Queue()
    # 生产者的子进程
    p1 = Process(target=producer, args=(q,))
    # 消费者的子进程
    p2 = Process(target=consumer, args=(q,))
    p1.start()
    p2.start()

'''
执行结果:
    生产者(11580)第0次产生数据:[0]  TCP    0.0.0.0:16422          0.0.0.0:0              LISTENING
    消费者(8452)拿走数据:[0]  TCP    0.0.0.0:16422          0.0.0.0:0              LISTENING
    生产者(11580)第1次产生数据:[1]  TCP    0.0.0.0:16422          0.0.0.0:0              LISTENING
    消费者(8452)拿走数据:[1]  TCP    0.0.0.0:16422          0.0.0.0:0              LISTENING
    生产者(11580)第2次产生数据:[2]  TCP    0.0.0.0:16422          0.0.0.0:0              LISTENING
    消费者(8452)拿走数据:[2]  TCP    0.0.0.0:16422          0.0.0.0:0              LISTENING
    
'''
基于Queue实现生产者消费者模型
'''
上面的情景,主进程永远也不会结束,原因是:生产者p1在生产完数据之后就结束了,但是消费者p2在取空队列里的数据之后,则一直处于死循环切卡在get()这一步

解决方法:让生产者在生产完租后一条数据后,往队列中发一个结束的信号,这样消费者在get到结束信号后,就可以break出循环
'''
from multiprocessing import Process, Queue
import time
import random
import os
import subprocess


def producer(q):
    for i in range(3):
        time.sleep(random.randint(1, 3))
        command = subprocess.Popen(
            'netstat -an |findstr 16422',
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        stdout = '[%s]%s' % (i, command.stdout.read().decode('gbk'))
        q.put(stdout)
        print('生产者(%s)第%s次产生数据:%s' % (os.getpid(), i, stdout), end='')
    q.put(None)  # 发送结束信号


def consumer(q):
    while True:
        result = q.get()
        if result is None:
            break  # 消费者收到生产者发送的结束信号,结束
        time.sleep(random.randint(1, 3))
        print('33[1;31m消费者(%s)拿走数据:%s33[0m' % (os.getpid(), result,), end='')


if __name__ == '__main__':
    q = Queue()
    # 生产者的子进程
    p1 = Process(target=producer, args=(q,))
    # 消费者的子进程
    p2 = Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
生产者在生产完毕之后,发送结束信号
from multiprocessing import Process, Queue
import time
import random
import os
import subprocess


def producer(q):
    for i in range(3):
        time.sleep(random.randint(1, 3))
        command = subprocess.Popen(
            'netstat -an |findstr 16422',
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        stdout = '[%s]%s' % (i, command.stdout.read().decode('gbk'))
        q.put(stdout)
        print('生产者(%s)第%s次产生数据:%s' % (os.getpid(), i, stdout), end='')



def consumer(q):
    while True:
        result = q.get()
        if result is None:
            break  # 消费者收到生产者发送的结束信号,结束
        time.sleep(random.randint(1, 3))
        print('33[1;31m消费者(%s)拿走数据:%s33[0m' % (os.getpid(), result,), end='')


if __name__ == '__main__':
    q = Queue()
    # 生产者的子进程
    p1 = Process(target=producer, args=(q,))
    # 消费者的子进程
    p2 = Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    q.put(None)          # 等p1结束,由主进程发送结束信号
由主进程发送结束信号
'''
JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
参数介绍:
    maxsize是队列中允许最大项数,省略则无大小限制。    
方法介绍:
    JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
'''

from multiprocessing import Process, JoinableQueue
import time
import random
import os
import subprocess


def producer(q):
    for i in range(3):
        time.sleep(random.randint(1, 3))
        command = subprocess.Popen(
            'netstat -an |findstr 5037',
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        stdout = '[%s]%s' % (i, command.stdout.read().decode('gbk'))
        q.put(stdout)
        print('生产者(%s)第%s次产生数据:%s' % (os.getpid(), i, stdout), end='')
    q.join()  # 生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止


def consumer(q):
    while True:
        result = q.get()
        if result is None:
            break  # 消费者收到生产者发送的结束信号,结束
        time.sleep(random.randint(1, 3))
        print('33[1;31m消费者(%s)拿走数据:%s33[0m' % (os.getpid(), result,), end='')
        q.task_done()  # 向q.join()发送一次信号,证明一个数据已经被取走了


if __name__ == '__main__':
    q = JoinableQueue()
    # 生产者的子进程
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    # 消费者的子进程
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True
    c2.daemon = True

    p_l = [p1, p2, c1, c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    

'''
主进程等--->p1,p2,p3等---->c1,c2
    p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程
'''
基于JoinableQueue([maxsize])实现多个生产者和消费者
原文地址:https://www.cnblogs.com/lichunke/p/9635877.html