Python 进程间通信 线程

一 进程间通信(IPC

IPC

进程之间相互隔离,当一个进程想要把数据给另一个进程,就要考虑IPC(进程间通信)

进程间通信的方式:

1. 管道:只能单向通讯,数据都是二进制

2. 文件:在硬盘上创建共享文件,

  缺点:速度快

  优点:数据量几乎没有限制

3. socket:编程复杂度较高

4. 共享内存:必须要操作系统来分配

  优点:速度快

  缺点:数据量不能太大

1.manger类 了解

需要强调的是 Manager创建的一些数据结构是不带锁的 可能会出现问题

Manager提供很多数据结构  list dict等等

Manager所创建出来的数据结构,具备进程间共享的特点
from multiprocessing import Process,Manager,Lock
import time


def task(data,l):
    l.acquire()
    num = data["num"] #
    time.sleep(0.1)
    data["num"] = num - 1
    l.release()

if __name__ == '__main__':
    # 让Manager开启一个共享的字典
    m = Manager()
    data = m.dict({"num":10})

    l = Lock()

    for i in range(10):
        p = Process(target=task,args=(data,l))
        p.start()

    time.sleep(2)
    print(data)
View Code

2.Queue队列

队列:先进先出
堆栈:先进后出

队列是一种特殊的数据结构,先存储的先取出    就像排队    先进先出

​相反的是堆栈,先存储的后取出, 就像衣柜 桶装薯片    先进后出

​扩展:

​  函数嵌套调用时  执行顺序是先进后出     也称之为函数栈  

​  调用 函数时  函数入栈   函数结束就出栈

from multiprocessing import Queue
# 创建队列对象  不指定maxsize 则没有数量限制
q = Queue(3)
# 存放值
q.put(1)
q.put(2)
q.put(3)
# print(q.full())  # 判断队列是否满了 True
# q.put(4)  # 如果容量已经满了,在调用put时将进入阻塞状态 直到有人从队列中拿走数据有空位置 才会继续执行

#取值
print(q.get()) # 1
print(q.get()) # 2
print(q.get()) # 3
print(q.empty())  # 判断队列中的数据是否取完 True
# print(q.get())# 如果队列已经空了,在调用get时将进入阻塞状态 直到有人从存储了新的数据到队列中 才会继续
# print(q.get_nowait())  # 取值 没有值不等待直接报错

补充:
#block 表示是否阻塞 默认是阻塞的   # 当设置为False 并且队列为空时 抛出异常
q.get(block=True,timeout=2)
# block 表示是否阻塞 默认是阻塞的   # 当设置为False 并且队列满了时 抛出异常
# q.put("123",block=False,)
# timeout 表示阻塞的超时时间 ,超过时间还是没有值或还是没位置则抛出异常  仅在block为True有效
"""
full
get_nowait
empty
都不适用于多进程的情况
"""
View Code
from multiprocessing import Process,Queue

def producer(q):
    q.put('hello GF~')

def consumer(q):
    print(q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer,args=(q,))
    c = Process(target=consumer, args=(q,))
    p.start()
    c.start()


"""
子进程放数据 主进程获取数据
两个子进程相互放 取数据
"""
View Code

3.生产者和消费者

1.生产者:产生数据的一方称之为生产者。

  消费者:处理数据的一方称之为消费者

2.问题:生产者和消费,处理速度不平衡,一方快一方慢,导致一方需要等待另一方。

3.解决方法:

将双方分开来.一专门负责生成,一方专门负责处理

这样一来数据就不能直接交互了 双方需要一个共同的容器

生产者完成后放入容器,消费者从容器中取出数据

这样就解决了双发能力不平衡的问题,做的快的一方可以继续做,不需要等待另一方

def eat(q):
    for i in range(10):
        # 要消费
        rose = q.get()
        time.sleep(random.randint(0, 2))
        print(rose,"吃完了!")

# 生产任务
def make_rose(q):
    for i in range(10):
        # 再生产
        time.sleep(random.randint(0, 2))
        print("第%s盘青椒肉丝制作完成!" % i)
        rose = "第%s盘青椒肉丝" % i
        # 将生成完成的数据放入队列中
        q.put(rose)

if __name__ == '__main__':
    # 创建一个共享队列
    q = Queue()
    make_p = Process(target=make_rose,args=(q,))
    eat_p =  Process(target=eat,args=(q,))


    make_p.start()
    eat_p.start()
View Code

4.JoinableQueue

1.生产者和消费者问题:

生产者生产结束以后,消费者一直循环吃,吃完以后,消费者并不知道
生产者已经生产结束,就会一直等待生产者生产东西

2.解决方法:JoinableQueue

例1:
from multiprocessing import JoinableQueue
# 可以被join的队列,join是等待摸个任务完成,able 可以怎么着, Queue是队列
from multiprocessing import Process,Queue,JoinableQueue
import random
import time


def producer(name,food,q):
    for i in range(10):
        data = '%s生产了%s%s'%(name,food,i)
        time.sleep(random.random())
        q.put(data)
        print(data)

def consumer(name,q):
    while True:
        data = q.get()
        if data == None:break
        print('%s吃了%s'%(name,data))
        time.sleep(random.random())
        q.task_done()  # 告诉队列你已经从队列中取出了一个数据 并且处理完毕了,task_down的次数 == get的次数,一次get对应一个taskdone连起来用



if __name__ == '__main__':
    q = JoinableQueue()

    p = Process(target=producer,args=('大厨egon','馒头',q))
    p1 = Process(target=producer,args=('跟班tank','生蚝',q))
    c = Process(target=consumer,args=('许兆龙',q))
    c1 = Process(target=consumer,args=('吃货jerry',q))
    p.start()
    p1.start()
    c.daemon = True
    c1.daemon = True
    c.start()
    c1.start()
    p.join()
    p1.join()

    q.join()  # 阻塞等到队列中数据全部取出
    print('')
例2:
from multiprocessing import JoinableQueue
# 可以被join的队列,join是等待摸个任务完成,able 可以怎么着, Queue是对垒

q = JoinableQueue()

q.put("1")
q.put("2")

print("取走了一个%s " % q.get())
q.task_done() # task_down 告诉队列这个数据已经处理完了
# 该函数不是表示任务全部处理完成,而是取出某个数据处理完成

print("再取走了%s" % q.get())
q.task_done()

print("......")
# q.join()  #等待队列中的数据处理完毕 , join, task_down的次数 == put的次数
print("Game Over")
View Code

二 线程

1.线程基础

1.什么线程:线程是操作系统运算调度的最小单位 (CPU最小执行单位),进程好比是一个车间(资源单位)线程就是车间的流水线(执行单位)。

2.线程和进程的区别:

 1.线程创建的开销远小于进程

 2.统一进程中的所有线程共享进程内的资源

 3.线程之间没有父子关系,都是平等的,PID相同

3.线程特点:

  1.每个进程都会有一个默认的线程

  2.每个进程可以存在多个线程

  3.同一进程中的所有线程之间数据是共享的

  4.创建线程的开销远比创建进程小的多

如何选择:

要根据具体的任务类型,IO密集 计算密集

4.开启线程的两种方式

1.实例化Tread类,target参数用于指定子线程要执行的任务
from threading import  Thread

def task():
    print("子线程 run........")

t = Thread(target=task)
t.start()
print("over")

2.继承Tread类,覆盖run方法
class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s is running'%self.name)
        time.sleep(3)
        print('%s is over'%self.name)

t = MyThread('egon')
t.start()
print('')
**与进程在使用方法上没有任何区别,不同的是开启子线程的代码可以写在任意位置**

之所以使用方法完全相同是因为,多进程其实是为了弥补多线程的缺憾而诞生的。详见GIL锁
View Code

2.Tread类的常用属性:

# threading模块包含的常用方法
import threading
print(threading.current_thread().name) #获取当前线程对象
print(threading.active_count()) # 获取目前活跃的线程数量
print(threading.enumerate()) # 获取所有线程对象


t = Thread(name="aaa")
# t.join() # 主线程等待子线程执行完毕
print(t.name) # 线程名称
print(t.is_alive()) # 是否存活
print(t.isDaemon()) # 是否为守护线程




from threading import Thread,current_thread,active_count
import time
import os

def task(name,i):
    print('%s is running'%name)
    # print('子current_thread:',current_thread().name)
    # print('子',os.getpid())
    time.sleep(i)

    print('%s is over'%name)
# 开线程不需要在__main__代码块内 但是习惯性的还是写在__main__代码块内
t = Thread(target=task,args=('egon',1))
t1 = Thread(target=task,args=('jason',2))
t.start()  # 告诉操作系统开辟一个线程  线程的开销远远小于进程
t1.start()  # 告诉操作系统开辟一个线程  线程的开销远远小于进程
t1.join()  # 主线程等待子线程运行完毕
print('当前正在活跃的线程数',active_count())
# 小的代码执行完 线程就已经开启了
print('')
# print('主current_thread:',current_thread().name)
# print('主',os.getpid())
View Code

3.守护线程

设置守护线程的语法与进程相同,相同的是也必须放在线程开启前设置,否则抛出异常。

守护线程的特点:
1.守护线程会在被守护线程结束后立即结束,
2.主线程的结束也就意味着进程的结束
  主线程必须等待其他非守护线程的结束才能结束
  (意味子线程在运行的时候需要使用进程中的资源,而主线程一旦结束了资源也就销毁了)
   换句话说,守护线程会在所有非守护线程执行完毕后结束。
from threading import  Thread
import time

def task():
    print("start....1")
    time.sleep(3)
    print("end......1")

def task2():
    print("start....2")
    time.sleep(4)
    print("end......2")

t = Thread(target=task)
t.daemon = True
t.start()

t2 = Thread(target=task2)
t2.start()

print("main over!")
View Code

 

4.线程间通信

from threading import Thread


money = 666

def task():
    global money
    money = 999

t = Thread(target=task)
t.start()
t.join()
print(money) # 999

# 同一进程下的线程间数据是共享的
View Code

5.互斥锁

from threading import Thread,Lock
import time


n = 100

def task(mutex):
    global  n
    mutex.acquire()
    tmp = n
    time.sleep(0.1)
    n = tmp - 1
    mutex.release()

t_list = []
mutex = Lock()
for i in range(100):
    t = Thread(target=task,args=(mutex,))
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(n) # 0
View Code
原文地址:https://www.cnblogs.com/tfzz/p/11341143.html