生产者消费者模型及线程

队列

队列的特点是先进先出。

创建进程队列要用到Queue,它是多进程安全的队列,可以实现多进程之间的数据传递。

from multiprocessing import Queue

q = Queue(3)  #括号内可以穿参,表示的是这个队列的最大存储数
q.put(1)  # put是往队列中添加数据的方法
q.put(2)
q.put(3)
print(q.full())  # 判断队列是否满了
#q.put(4)  #当队列满了之后再添加数据不会报错,但会原地等待(阻塞态),直到队列中有数据被取走
print(q.get())
print(q.get())
print(q.get())
print(q.empty())  # 判断队列中数据是否被取完
#print(q.get_nowait())  取值,如果没有值则不会等待而是直接报错
#print(q.get()  # 当队列中的数据被取完之后再次获取,程序也会原地等待(阻塞),直到有人往队列里放入值

##full,get_nowait,empty都不是用于多进程的情况

通过队列让进程间通信

from multiprocessing import Process,Queue
def producer(q):  # 向队列中放数据
    q.put('hello GF')

def consumer(q):  # 从队列中取数据
    print(q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer,args=(q,))  # 进程p向队列中放数据
    c = Process(target=consumer,args=(q,)) #  进程c向队列中取数据 
    p.start()
    c.start()

生产者消费者模型

生产者:生产数据;消费者:消费数据

举个例子,生产者是做包子的,而消费者是买包子,当生产做的包子比消费者买的包子要多时就会出来供大于求的情况,而如果消费者要买的包子比生产者生产的多时,则会出现供不应求的情况。在程序中也会出现这样的问题,当生产数据的进程比消费数据的进程快和当消费数据的进程比生产数据的进程快时也会出现和上面买卖包子一样的问题,在程序中这两种情况都会影响程序对数据的处理速度,生产者消费者模型的概念就是为了平衡这个问题。

#建立一个吃包子模型
from multiprocessing import Process,Queue,JoinableQueue
import random
import time

def producer(name,food,q):
    for i in range(5):
        data = '%s生产力%s%s'%(name,food,i)
        time.sleep(random.random())
        q.put(data)
        print(data)

def consumer(name,q):
    while True:
        data = q.get()
        print('%s吃了%s'%(name,data))
        time.sleep(random.random())

if __name__ == '__main__':
    q = Queue()

    p = Process(target=producer,args=('egon','馒头',q))
    p1 = Process(target=producer,args=('tank','生蚝',q))
    c = Process(target=consumer,args=('龙胖',q))
    c1 = Process(target=consumer,args=('jerry',q))
    p.start()
    p1.start()
    c.start()
    c1.start()
#这个模型的问题在于队列本身的一个问题,当生产者生产完包子后就结束了,但消费者在取完包子之后,则进入了等待的状态,所以被卡死在了get()这一步
#解决的办法是可以在生产者生产完毕后,往队列再发一个结束信号,这样可以让消费者在接到这个信号后停止循环。
from multiprocessing import Process,Queue,JoinableQueue
import random
import time

def producer(name,food,q):
    for i in range(5):
        data = '%s生产力%s%s'%(name,food,i)
        time.sleep(random.random())
        q.put(data)
        print(data)

def consumer(name,q):
    while True:
        data = q.get()
        if data == None:break
        print('%s吃了%s'%(name,data))
        time.sleep(random.random())

if __name__ == '__main__':
    q = Queue()

    p = Process(target=producer,args=('egon','馒头',q))
    p1 = Process(target=producer,args=('tank','生蚝',q))
    c = Process(target=consumer,args=('龙胖',q))
    c1 = Process(target=consumer,args=('jerry',q))
    p.start()
    p1.start()
    c.start()
    c1.start()
    p.join()
    p1.join()
    q.put(None)
    q.put(None)

但是,上述方式在有多个生产者和多个消费者时会变得很麻烦,于是我们要用到一个新的东西JoinableQueue.

from multiprocessing import Process,Queue,JoinableQueue
import random
import time

def producer(name,food,q):
    for i in range(5):  # 生产五个
        data = '%s生产力%s%s'%(name,food,i)
        time.sleep(random.random())  # 定一个随机事件
        q.put(data)  # 将数据装入队列
        print(data)


def consumer(name,q):
    while True:
        data = q.get()  # 将数据取出队列
        print('%s吃了%s'%(name,data))
        time.sleep(random.random())
        q.task_done()  # 可以告诉队列你已经从队列中取出了一个数据,并处理完毕了

if __name__ == '__main__':
    q = JoinableQueue()

    p = Process(target=producer,args=('egon','馒头',q))  # 生产进程1
    p1 = Process(target=producer,args=('tank','生蚝',q))  # 生产进程2
    c = Process(target=consumer,args=('龙胖',q))  # 消费进程1
    c1 = Process(target=consumer,args=('jerry',q))  #消费进程2
    p.start()
    p1.start()
    c.daemon = True
    c1.daemon = True
    c.start()
    c1.start()
    p.join()
    p1.join()

    q.join()  # 等待队列中的数据全部取出来

线程理论

什么是线程?

线程就是进程的虚拟单位,都是用来帮助我们形象的描述某种事物。

进程:资源单位(资源分配的最小单位)

线程:执行单位(线程CPU调度的最小单位)

如果将内存比喻为工厂,那么进程就相当于工厂车间,而进程则是车间里的流水线,每个进程都自带一个线程(一般把这个自带的线程称为主线程),线程才是真正的执行单位,进程只是在线程运行过程中提供代码运行所需要的资源。

为什么要有线程:

开进程:1.申请内存空间,耗资源

    2."拷贝代码",耗资源

开线程:一个进程内可以起多个线程,并且线程与线程之间数据是共享的。(开线程的开销远远小于开进程的开销)

创建线程

threading模块

multiprocess模块是完全模仿threading模块的接口,所以两者在使用层面是非常相似的。

#用threading直接创建
from threading import Thread
import time

def task(name):
    print('%s is running'%name)
    time.sleep(3)
    print('%s is over'%name)
#开线程不需要在__main__代码块内,但习惯性的还是写在__main__代码块内
t = Thread(target=task,args=('egon',))
t.start()  # 告诉操作系统开辟一个线程
print('主进程')
#自己创建一个类然后继承
from threading import Thread
import time

class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s is running'%self.name)
        time.sleep(3)
        print('%s is over'%self.name)

t = MyThread('egon')
t.start()
print('')

线程对象及其他方法

from threading import Thread,current_thread,active_count
import time
import os

def task(name,i):
    print('%s is running'%name)
    print('子current_thread:',current_thread().name) # 拿到属于的是主线程的第几个子线程
    print('',os.getpid())  # 拿到pid
    time.sleep(i)

    print('%s is over'%name)
t = Thread(target=task,args=('egon',1))
t1 = Thread(target=task,args=('jason',2))
t.start()
t1.start()
t.join()  # 主线程等待子线程运行完毕
print('当前正在活跃的线程数',active_count())  # 拿到现在还存在的进程数量
print('')
print('主current_thread:',current_thread().name)  # 拿到的是主线程
print('',os.getpid())  # 拿到pid

守护线程

from threading import Thread,current_thread
import time

def task(i):
    print(current_thread().name)
    time.sleep(i)
    print('G')
# for i in range(3):
#     t = Thread(target=task,args=(i,))
#     t.start()
t = Thread(target=task,args=(1,))
t.daemon = True
print('')

主线程的结束也就是以为着进程的结束,主线程必须等待其他非守护线程的结束才能结束,也就是说子线程在运行的时候需要使用进程中的资源,但是主线程一结束资源也就销毁了。

#例子
from threading import Thread
from multiprocessing import Process
import time
def foo():
    print(123)
    time.sleep(1)  # 睡眠一秒
    print('end123')

def bar():
    print(456)
    time.sleep(3)  # 睡眠三秒
    print('end456')

if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)
    t1.daemon=True  # 当主线程结束后子线程也会结束
    t1.start()
    t2.start()
    print('main------------')
>>>:123
        456
        main------------
        end123
        end456
##运行完毕并不是终止运行了,主线程会等到其它的非守护线程运行结束之后再结束  

线程间的通信

from threading import Thread

money = 666

def task():
    global money  # 修改全局变量
    money = 999

t = Thread(target=task)
t.start() # 让操作系统创建子线程
t.join()  #  主线程等待子线程运行完毕
print(money)

结论,同一进程下,线程共用的是一个进程中的资源。

互斥锁

from threading import Thread,Lock
import time

n = 100

def task(mutex):
    global n
    mutex.acquire()
    tmp = n
    time.sleep(0.1)
    n = tmp - 1
    mutex.release()

t_list = []
mutex = Lock()
for i in range(100):
    t = Thread(target=task,args=(mutex,))
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(n)

和进程中的锁一样,每次只允许一个线程对数据进行操作,抢到锁的可以进去操作,操作完之后会释放锁,之后其他的进程再进行抢锁,然后操作数据。

 

原文地址:https://www.cnblogs.com/wangnanfei/p/11342122.html