Day035--Python--管道, Manager, 进程池, 线程切换

管道

#创建管道的类:
Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
#参数介绍:
dumplex:默认管道是全双工的,如果将duplex设置成False,conn1只能用于接收,conn2只能用于发送。
#主要方法:
    conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
    conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
 #其他方法:
conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回连接使用的整数文件描述符
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout设成None,操作将无限期地等待数据到达。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

管道介绍
View Code 了解

 

from multiprocessing import Process, Pipe

conn1, conn2 = Pipe()
conn1.send('你好')
print('>>>>>>>>>>>')
msg = conn2.recv()
print(msg)
from multiprocessing import Process, Pipe
def func1(conn2):
    msg = conn2.recv()
    print(msg)

if __name__ == '__main__':
    conn1, conn2 = Pipe()
    p = Process(target=func1, args=(conn2,))
    p.start()
    conn1.send('你好啊,我叫赛利亚')

   管道错误模拟: 管道关闭, 异常处理

from multiprocessing import Process, Pipe

def func(conn2):
    while 1:
        try:
            # 如果管道一端关闭了, 另外一端接收消息时会报错, 要使用异常处理
            msg = conn2.recv()
            print(msg)
        except EOFError:
            print('对方管道已关闭')
            conn2.close()
            break

if __name__ == '__main__':
    conn1, conn2 = Pipe()
    p = Process(target=func, args=(conn2,))
    p.start()
    conn1.send('你好啊')
    conn1.close()
    # conn1.recv()  # OSError: handle is closed

数据共享

  Manager

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
from multiprocessing import Process, Manager

def func(m_dic):
    m_dic['name'] = '大猪蹄子'    # 修改共享数据

if __name__ == '__main__':
    m = Manager()
    m_dic = m.dict({'name': '大佬'})  # 创建共享数据
    print('原始>>>', m_dic)                # 打印初始共享数据
    p = Process(target=func, args=(m_dic,))
    p.start()
    p.join()
    print('主进程>>>>', m_dic)    # 打印的是修改后的共享数据
'''
多进程同时获取数据, 修改后重新赋值, 可能同时拿到100, 减1后都把99赋值回去, 得到的数据不准确,数据不安全
可以通过加锁解决
'''
from multiprocessing import Process, Manager

def func(m_dic):
    m_dic['count'] -= 1

if __name__ == '__main__':
    m = Manager()
    m_dic = m.dict({'count': 100})
    lst = []
    for i in range(50):
        p = Process(target=func, args=(m_dic,))
        p.start()
        lst.append(p)

    [p.join() for p in lst]
    print('主进程>>>', m_dic)
# 加锁, 解决数据错乱问题
from multiprocessing import Process, Manager, Lock

def func(m_dic, ml):
    # with ml: 下面的缩进内容等同于 ml.acquire()  ml.release() 之间的内容, 作用: 加锁
    with ml:   
        m_dic['count'] -= 1

if __name__ == '__main__':
    m = Manager()
    ml = Lock()
    m_dic = m.dict({'count': 100})
    lst = []
    for i in range(50):
        p = Process(target=func, args=(m_dic, ml))
        p.start()
        lst.append(p)

    [p.join() for p in lst]

    print(m_dic)

进程池  

  什么是进程池?进程池的作用. 并行 并发 同步 异步 阻塞 非阻塞 互斥 死锁.

import time
from multiprocessing import Process, Pool

def func(n):
    time.sleep(1)
    print(n)

if __name__ == '__main__':
    pool = Pool(4)  # 设置进程数量, 如果不设置, 默认是CPU数量
    pool.map(func, range(100))   # map自带join功能, 异步执行任务, 参数是可迭代对象

   进程池比多进程效率高

import time
from multiprocessing import Process, Pool

def func(n):
    for i in range(100):
        n += 1
    print(n)

if __name__ == '__main__':
    pool_start_time = time.time()
    pool = Pool()
    pool.map(func, range(100))
    pool_end_time = time.time()
    pool_dif_time = pool_end_time - pool_start_time

    lst = []
    p_s_time = time.time()
    for i in range(100):
        p = Process(target=func, args=(i,))
        p.start()
        lst.append(p)
    [p.join() for p in lst]
    p_e_time = time.time()
    pd_time = p_e_time - p_s_time

    print('进程池执行时间>>>', pool_dif_time)
    print('多进程执行时间>>>', pd_time)
View Code 进程池与多进程运行时间对比

   

import time
from multiprocessing import Process, Pool

def func(i):
    time.sleep(0.5)
    print(i**2)

if __name__ == '__main__':
    pool = Pool(4)
    pool_s_time = time.time()
    pool.map(func, range(100))
    pool_e_time = time.time()
    pool_dif_time = pool_e_time - pool_s_time

    p_lst = []
    p_s_time = time.time()
    for i in range(100):
        p = Process(target=func, args=(i,))
        p.start()
        p_lst.append(p)
    [p.join() for p in p_lst]
    p_e_time = time.time()
    p_dif_time = p_e_time - p_s_time

    print('数据池执行时间:', pool_dif_time)
    print('多进程运行时间:', p_dif_time)
View Code 这种情况下进程池比多进程 运行慢

  进程池的同步方法: apply

import time
from multiprocessing import Process, Pool

def fun(i):
    time.sleep(0.5)
    # print(i)
    return i**2

if __name__ == '__main__':
    p = Pool(4)
    for i in range(10):
        res = p.apply(fun, args=(i,))   # 同步执行的方法, 它会等待任务的返回结果(return)
        print(res)                     # 打印的是fun的返回值(return)

   print('主进程结束') # 子进程都结束后打印

  进程池的异步方法:  apply_async      # [ apply()方法的变体,它返回一个结果对象。]

import time
from multiprocessing import Process, Pool

def fun(i):
    time.sleep(0.5)
    return i**2

if __name__ == '__main__':
    p = Pool(4)
    res_lst = []
    for i in range(10):
        res = p.apply_async(fun, args=(i,))    # 异步执行, res是对象multiprocessing.pool.ApplyResult object  主进程代码执行完毕不会等待子进程, 直接关闭主进程.
        res_lst.append(res)
    for i in res_lst:
        print(i.get())

   print('主进程结束') # 如果没有i.get()方法, 则主进程不会等待子进程执行完就会结束

get([timeout])

在产生结果时返回该结果。如果超时限制不是空, 而且结果没有在时限内返回, 则抛出多进程超时错误。 如果远程调用报出异常,那么get()方法将再次抛出这个异常。

# 如果不加close和join, 程序会直接随主进程结束运行,不会等待打印i. 加join后可以感知进程的运行

import time
from multiprocessing import Process, Pool

def fun(i):
    time.sleep(0.5)
    print(i)
    return i**2

if __name__ == '__main__':
    p = Pool(4)
    res_lst = []
    for i in range(10):
        res = p.apply_async(fun, args=(i,))   
        res_lst.append(res)
        # print(res)      # 异步执行, res是多个对象  <multiprocessing.pool.ApplyResult object at 0x000001B5BBD7C128>

    p.close()     # 不是关闭进程池,而是不允许再有其他任务来使用进程池
    p.join()       # 这是感知进程池中任务的方法,等待进程池的任务全部执行完
    for el in res_lst:
        print('结果>>>', el.get())

   # time.sleep(4) # 如果把close和join还有for循环都注释掉, 此处等待几秒也可以打印出i
print('主进程结束')

  

  回调函数 callback

import os
from multiprocessing import Process, Pool
def func1(n):
    print('func1', os.getpid())
    return n*n

def func2(nn):
    print('func2', os.getpid())
    print(nn)

if __name__ == '__main__':
    print('主进程:', os.getpid())
    p = Pool(4)
    p.apply_async(func1, args=(10,), callback=func2)   #把func的返回结果传参给func2, func2 在主进程中运行    如果func1返回多个结果, 那么将以元组的形式传给func2
    p.close()
    p.join()

线程切换

#什么是线程:
#指的是一条流水线的工作过程,关键的一句话:一个进程内最少自带一个线程,其实进程根本不能执行,进程不是执行单位,是资源的单位,分配资源的单位
#线程才是执行单位
#进程:做手机屏幕的工作过程,刚才讲的
#我们的py文件在执行的时候,如果你站在资源单位的角度来看,我们称为一个主进程,如果站在代码执行的角度来看,它叫做主线程,只是一种形象的说法,其实整个代码的执行过程成为线程,也就是干这个活儿的本身称为线程,但是我们后面学习的时候,我们就称为线程去执行某个任务,其实那某个任务的执行过程称为一个线程,一条流水线的执行过程为线程

#进程vs线程
#1 同一个进程内的多个线程是共享该进程的资源的,不同进程内的线程资源肯定是隔离的
#2 创建线程的开销比创建进程的开销要小的多


#并发三个任务:1启动三个进程:因为每个进程中有一个线程,但是我一个进程中开启三个线程就够了
#同一个程序中的三个任务需要执行,你是用三个进程好 ,还是三个线程好?
#例子:
    # pycharm 三个任务:键盘输入  屏幕输出  自动保存到硬盘
    #如果三个任务是同步的话,你键盘输入的时候,屏幕看不到
    #咱们的pycharm是不是一边输入你边看啊,就是将串行变为了三个并发的任务
    #解决方案:三个进程或者三个线程,哪个方案可行。如果是三个进程,进程的资源是不是隔离的并且开销大,最致命的就是资源隔离,但是用户输入的数据还要给另外一个进程发送过去,进程之间能直接给数据吗?你是不是copy一份给他或者通信啊,但是数据是同一份,我们有必要搞多个进程吗,线程是不是共享资源的,我们是不是可以使用多线程来搞,你线程1输入的数据,线程2能不能看到,你以后的场景还是应用多线程多,而且起线程我们说是不是很快啊,占用资源也小,还能共享同一个进程的资源,不需要将数据来回的copy!
View Code什么是线程

   线程的创建方法一:

# 线程和进程很像, 一个进程中至少有一个线程, 进程是资源层面的, 线程负责实际的操作

import time
from threading import Thread

def func(n):
    time.sleep(1)   # 子线程运行地太快了, 如果不加time.sleep,会在打印主线程之前跑完
    print(123)

if __name__ == '__main__':
    t = Thread(target=func, args=(1,))
    t.start()
    t.join()      # 等待子线程跑完再执行主线程下面的内容
    print('主线程')

  线程的创建方法二:

from threading import Thread

class MyThread(Thread):
    def __init__(self, n):
        super().__init__()
        self.n = n
    def run(self):
        print('换汤不换药')
        print('self.n>>>', self.n)

if __name__ == '__main__':
    t = MyThread('你好')
    t.start()
    t.join()
    print('主线程结束')
原文地址:https://www.cnblogs.com/surasun/p/9848072.html