进程通信与线程初识

一.进程通信机制

进程间通信的IPC(Inter-Process Communication)机制:通过队列来实现进程间的数据共享

底层队列通过管道和锁定实现

队列初识

#
from multiprocessing import Queue

q = Queue(5) # 不传参数,默认是这个队列的最大存储数

for i in range(5) :
    q.put(i)
print(q.full())  # 判断队列是否满了
# q.put(6) # 当队列的达到最大长度之后,再放入数据不会报错
# 会陷入阻塞态,直到队列中的元素被取出

for i in range(4) :
    print(q.get())
print(q.empty()) # 判断队列中元素是否为空
# print(q.get()) #当队列中的长度为0时,再次获取值
#程序会陷入阻塞态,直到列表中获得新的值
print(q.get_nowait())
# 取值 当队列为空时,不会陷入阻塞态,而是直接报错
View Code

Queue实现单进程通信

from multiprocessing import  Process,Queue

def producer(q) :
    q.put('beautiful world!')

def consumer(q) :
    print(q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target= producer,args=(q,))
    c = Process(target= consumer,args=(q,))
    p.start()
    c.start()
View Code

JoinableQueue实现多进程通信

from multiprocessing import Process,JoinableQueue
import random
import time

def producer(name,food,q) :

    for i in range(10) :
        data = "%s生产了%s%s " %(name,food,i)
        time.sleep(random.random())
        q.put(data)
        print(data)

def consumer(name,q) :

    while True :
        data = q.get()
        print('%s吃了%s' %(name,data))
        time.sleep(random.random())
        q.task_done() # 告诉队列已经在队列中获取到了一个数据,并且处理完毕

if __name__ == '__main__':
    q = JoinableQueue()

    p = Process(target= producer,args=('大厨Dragon','鸡排',q))
    p1 = Process(target= producer,args=('打杂','蛋挞',q))
    c = Process(target= consumer,args=('吃货1',q))
    c1 = Process(target= consumer,args=('吃货2',q))

    p.start()
    p1.start()
    c.daemon = True
    c1.daemon = True # 消费者成为主进程的守护进程
    c.start()
    c1.start()
    p.join()
    p1.join() # 生产者执行完毕
    # 主线程的结束也就意味着进程的结束
    # 主线程必须等待其他非守护线程的结束才能结束
    q.join() # 等待队列中的数据全部去取出
View Code

PS:JoinableQueue中的join方法能在队列中的全部数据取出后解除阻塞态。

生产者与消费者模型

适用范围:生产者消费与消费者模型能解决绝大多数的并发问题

生产者:生产/制造数据

消费者:消费/处理数据

队列:帮助生产者与消费者通信的的媒介

二.线程初识

进程与线程的比较:

进程:资源分配的最小单位

线程:cpu调度的最小单位

每个进程都自带一个线程,线程才是真正的执行单位,进程给运行的线程提供所需资源

为什么使用线程:开启线程的开销要远远小于进程

PS :线程与线程之间的数据是共享的。

生成线程的两种方法

from threading import Thread
import  time


def task(name) :
    print(f'{name}is running')
    time.sleep(3)
    print(f'{name}is over')

t = Thread(target=task,args=('Dragon',))
t.start()
print('zhu')
View Code
from threading import Thread
import time

class MyThread(Thread) :

    def __init__(self,name):
        super().__init__()
        self.name = name


    def run(self):
        print(f'{self.name}is running')
        time.sleep(3)
        print(f'{self.name}is over')


t = MyThread('Dragon')

t.start()
print('zhu')
View Code

PS:在主进程下开启多个线程,每个线程都跟主进程的pid一样

线程对象的方法:

Thread实例对象的方法
  isAlive(): 判断线程是否存活。
  getName(): 返回线程名。
  setName(): 设置线程名。

threading模块提供的一些方法:
  threading.currentThread(): 返回当前的线程变量。
  threading.enumerate(): 返回一个包含正在运行的线程的list。
  threading.activeCount(): 返回正在运行的线程数量。
View Code

守护线程:

主线程结束也就意味着进程终结

主进程必须等待其他非守护线程的结束才能结束

from threading import Thread,current_thread
import time

def task(i) :
    print(current_thread().name)
    time.sleep(i)
    print('GG')

t = Thread(target= task,args=(1,))
t.daemon = True
t.start()
print('zhu')
View Code

线程间可以直接通信.

互斥锁:

from threading import Thread,Lock
import  time

n = 100

def task(mutex) :
    global n
    mutex.acquire()
    tmp = n
    time.sleep(0.1)
    n = tmp - 1
    mutex.release()

t_list = []
mutex = Lock()
for i in range(100) :
    t = Thread(target=task,args=(mutex,))
    t.start()
    t_list.append(t)

for t in t_list :
    t.join()
print(n)
View Code
原文地址:https://www.cnblogs.com/Cpsyche/p/11342272.html