Python中的并发

Python并发

并发三种层次

个人理解,并发是计算机在逻辑上能处理多任务的能力。一般分类三种类型:

  1. 异步,异步本质上是单线程的,因为 IO 操作在很多时候会存在阻塞,异步就是在这种阻塞的时候,通过控制权的交换来实现多任务的。即异步本质上是运行过程中的控制权的交换。最典型的例子就是生产者消费者模型。
    • 异步这个概念在不同的地方有不同的说法,比如 python 里面叫做协程,内部通过生成器来实现控制权的交换。但是无论怎么称呼,异步这种并发方式都脱离不了控制权的交换这么一个事实。
  2. 多进程,进程是一个程序具体的实例,拥有自己独立的内存单元。
  3. 多线程,线程依附于进程,共享存储空间。
    • 由于 Python 官方的解释器 Cython 对多线程有一个全局的锁(GIL),所以 Cython 中的线程局限性会比较大。这里不多解释。

这里还有一个概念需要注意,在使用并发的时候弄清楚需要并发的任务是计算密集还是IO密集
因为异步对于计算密集的任务是无效的。因为异步的本质是 IO 操作过程中阻塞时的控制权交换。在计算密集的任务中是没有这样的阻塞的。

协程

前面说了异步的本质是控制权的交换,这里通过一个生产者消费者模型的例子来体会一下这么个过程。

生成者消费者

def consumer():         # 定义消费者,由于有yeild关键词,此消费者为一个生成器
    print("[Consumer] Init Consumer ......")
    r = "init ok"       # 初始化返回结果,并在启动消费者时,返回给生产者
    while True:
        n = yield r     # 消费者通过yield接收生产者的消息,同时返给其结果
        print("[Consumer] conusme n = %s, r = %s" % (n, r))
        r = "consume %s OK" % n     # 消费者消费结果,下个循环返回给生产者

def produce(c):         # 定义生产者,此时的 c 为一个生成器
    print("[Producer] Init Producer ......")
    r = c.send(None)    # 启动消费者生成器,同时第一次接收返回结果
    print("[Producer] Start Consumer, return %s" % r)
    n = 0
    while n < 5:
        n += 1
        print("[Producer] While, Producing %s ......" % n)
        r = c.send(n)   # 向消费者发送消息并准备接收结果。此时会切换到消费者执行
        print("[Producer] Consumer return: %s" % r)
    c.close()           # 关闭消费者生成器
    print("[Producer] Close Producer ......")

produce(consumer())

新关键字

# 异步IO例子:适配Python3.5,使用async和await关键字
async def hello(index):       # 通过关键字async定义协程
    print('Hello world! index=%s, thread=%s' % (index, threading.currentThread()))
    await asyncio.sleep(1)     # 模拟IO任务
    print('Hello again! index=%s, thread=%s' % (index, threading.currentThread()))

loop = asyncio.get_event_loop()     # 得到一个事件循环模型
tasks = [hello(1), hello(2)]        # 初始化任务列表
loop.run_until_complete(asyncio.wait(tasks))    # 执行任务
loop.close()                        # 关闭事件循环列表

网络io

async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(url, resp.status)
print(url, await resp.text())

loop = asyncio.get_event_loop()     # 得到一个事件循环模型
tasks = [                           # 初始化任务列表
    get("http://zhushou.360.cn/detail/index/soft_id/3283370"),
    get("http://zhushou.360.cn/detail/index/soft_id/3264775"),
    get("http://zhushou.360.cn/detail/index/soft_id/705490")
]
loop.run_until_complete(asyncio.wait(tasks))    # 执行任务
loop.close()                        # 关闭事件循环列表

线/进程

这里以进程的multiprocessing模块举例,线程可以使用multiprocessing.dummy,所有的API均相同。

例子

import multiprocessing as mp

############## 直接实例化 ############

def func(number):
    result = number * 2

p = mp.Process(target=func, args=(3, )) #实例化进程对象
p.start() #运行进程

############ 类封装 #############

class MyProcess(mp.Process):
    def __init__(self, interval):
        mp.Process.__init__(self)

    # 需要重载的函数
    def run(self):
        print('I'm running)

p = MyProcess(1)
p.start()

#################################

p.terminal() # 主动结束进程
p.join() #让主进程等待子进程结束


# 一些常用的属性
p.pid #获得进程的id号
p.name #获得进程名
p.is_alive() #判断进程是否还存活
p.daemon = True #设置进程随主进程一起结束

mp.active_children() #获得当前进程的所有子进程
mp.current_process() #返回正在运行的进程
os.getpid() #获得当前进程的pid

线程池

from multiprocessing.dummy import Pool as ThreadPool 

tasks = list()

def do_task(item):
    return item

pool = ThreadPool(3)

################ 原始操作  #######################

for item in items:
    pool.apply_async(do_task, (item,)) #添加进程,非阻塞,返回执行结果
    pool.apply(do_task, (item,)) #阻塞

############## map操作  #####################3

results = pool.map(do_task, items)

################################

pool.close() #关闭进程池后不会有新的进程被创建
pool.join() #等到结束,必须在close后使用

进程通信

# Lock(锁)
# 限制对资源的访问

def func(lock): #使用with
    with lock:
        print('I got lock')
def func(lock): #不使用with
    lock.acquire() #请求锁
    try:
        print('I got lock')
    finally:
        lock.release() #释放锁

lock = mp.Lock() #申请锁
p = mp.Process(target=func, args=(lock,))
p.start()

############################################

# Semaphore(信号量)
# 限制资源的最大连接数

def func(s):
    s.aquire() #请求连接
    s.release() #断开连接

s = mp.Semaphore(2) #定义信号量的最大连接数
for i in range(5):
    p = mp.Process(target=func, arg=(s))
    p.start

############################################

# Event(事件)
# 进程间同步

def func(e):
    e.wait() #定义等待时间,默认等待到e.set()为止,阻塞
    e.is_set() #判断消息是否被发出
    print('got')
    
e = mp.Event()
p = mp.Process(target=func, args=(e,))
p.start()
e.set() #发出消息

############################################

# Queue(队列)
# 多进程之间的数据传递

import Queue

Queue.Queue(maxsize = 0)  # 先进先出, maxsize小于等于则不限大小
Queue.LifoQueue(maxsize = 0)  # 后进先出
Queue.PriorityQueue(maxsize = 0)  # 构造一个优先级队列

#异常
Queue.Empty  #当调用非阻塞的get()获取空队列的元素时, 引发异常
Queue.Full  #当调用非阻塞的put()向满队列中添加元素时, 引发异常

# 生存者消费者模型

def produce(q):
    try:
        data = q.put(data, block=, timeout=) 
        # 若block为False且队列已满,则立即抛出Queue.Full
        # 若block为True进程会阻塞timeout指定时间,直到队列有空间,否则抛出Queue.Full
    except:

def cosume(q):
    try:
        q.get(block=, timeout=) #与上同理
    except:

q = mp.Queue()
pro = mp.Process(target=produce, args=(q, ))
cos = mp.Process(target=cosume, args=(q, ))
pro.start()
cos.start()
pro.join()
cos.join()

############################################

# Pipe(管道)
# 多进程之间的数据传递

def func1(pipe):
    while True:
        pipe.send(1)
def func2(pipe):
    while True:
        pipe.recv() #如果管道内无消息可接受,则会阻塞
pipe = mp.Pipe(duplex=) #参数默认为True即管道的两边均可收发
# 返回(conn1, conn2),当参数为False时conn1只能收信息,conn2只能发消息
p1 = mp.Process(target=func1, args=(pipe[0], ))
p2 = mp.Process(target=func2, args=(pipe[1], ))
p1.start()
p2.start()
p1.join()
p2.join()

并发池

新的并发池模块concurrent.futures再次封装了并发操作,可以用于量大但简单并发操作。
进程线程通用关键字换一下就行。

future对象

from concurrent.futures import ThreadPoolExecutor
import time

def working(message):
    time.sleep(2)
    return message

pool = ThreadPoolExecutor(max_workers=2)  # 创建一个最大可容纳2个task的线程池

worker1 = pool.submit(working, ("hello"))  # 往线程池里面加入一个task
worker2 = pool.submit(working, ("world"))  # 往线程池里面加入一个task

# submit 返回了一个future对象,即未完成的操作,我们可以通过调用函数来查看其状态

worker1.done()  # 判断task1是否结束

worker1.result()  # 查看task1返回的结果
worker2.result()  # 查看task2返回的结果

executor对象

import concurrent.futures

items = list() # 任务对象

def do_task(item): # 处理函数
    return item

#################### submit #########################

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(do_task, item): item for item in items}
    for future in concurrent.futures.as_completed(futures):
        item = futures[future]
        result = future.result()
        print(item, result)

#################### map  #########################

# map跟submit的区别在于submit是无序的,而map是有序的

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for item, result in zip(items, executor.map(do_task, items)):
            print(item, result)

#################### wait  #########################

# wait返回一个元组,包含已完成任务的集合和未完成任务的集合。

pool = ThreadPoolExecutor(5)
futures = []
for item in items:
    futures.append(pool.submit(do_task, item))

concurrent.futures.wait(futures, timeout=None, return_when='FIRST_COMPLETED')

return_when参数可选FIRST_COMPLETED, FIRST_EXCEPTIONALL_COMPLETE
ALL_COMPLETE 会阻塞

参考

Python 并行任务技巧
Python并发编程之线程池/进程池

原文地址:https://www.cnblogs.com/nevermoes/p/10000759.html