【Python】多任务

博观而约取,厚积而薄发。

1. 多任务介绍

多任务是指在同一时间内执行多个任务,例如: 现在电脑安装的操作系统都是多任务操作系统,可以同时运行着多个软件。

1.1 多任务的执行方式

  • 并发

指在一个时间段内,有多个程序在同一个cpu上运行,但是任意时刻只有一个程序在cpu上运行

例如:

对于单核cpu处理多任务,不可能存在同一时刻有多个软件同时运行,只不过是CPU运行时间划分成若干个时间段交给不同的程序运行,他们之间是交替运行。但是由于cpu执行速度太快,所以我们感觉就像这些软件都在同时执行一样。

  • 并行

在任意时刻,有多个程序同时运行在多个cpu上

例如:

对于多核cpu处理多任务,由于各个任务不存在争夺cpu的执行权限,也就保证了每个任务独自占用一个cpu内核,此时每个任务在任意时刻他们都是同时运行,不存在交替执行的过程。前提是在任务数小于cpu核数,但是无论任务数有多少,任意时刻不同cpu内核上的任务都是同时运行的。

  • 区别:

并发和并行是即相似又有区别的两个概念,并行是指两个或者多个事件在同一时刻发生;而并发是指两个或多个事件在同一时间间隔内发生。在多道程序环境下,并发性是指在一段时间内宏观上有多个程序在同时运行,但在单处理机系统中,每一时刻却仅能有一道程序执行,故微观上这些程序只能是分时地交替执行。倘若在计算机系统中有多个处理机,则这些可以并发执行的程序便可被分配到多个处理机上,实现并行执行,即利用每个处理机来处理一个可并发执行的程序,这样,多个程序便可以同时执行。

2. 进程

在Python程序中,想要实现多任务可以使用进程来完成,进程是实现多任务的一种方式。一个正在运行的程序或者软件就是一个进程,它是操作系统进行资源分配的基本单位,也就是说每启动一个进程,操作系统都会给其分配一定的运行资源(内存资源)保证进程的运行。

注意:

一个程序运行后至少有一个进程,一个进程默认有一个线程称为主线程,进程里面可以创建多个线程,线程是依附在进程里面的,没有进程就没有线程

2.1多进程使用

方式--fork

标准库模块os中提供一个函数fork(),用于将当前进程复制一份子进程,而后父进程和子进程从调用fork()处开始分叉,兵分两路,继续并行运行后面的程序与普通函数不同的是,函数fork()会返回两次,分别在父进程和子进程内返回,返回值分为三种情况:

  1. 返回值小于0,表示复制子进程失败
  2. 返回值等于0,表示处于在子进程中
  3. 返回值大于0,表示处于父进程中,返回值就是子进程的id

windows操作系统中无法调用函数fork()

import os

try:
    pid = os.fork()
except OSError:
    print("操作系统不支持")
    exit()
if pid < 0:
    print("复制子进程失败")
elif pid == 0:
    print("我是子进程%d,我的父进程是%d" % (os.getpid(), os.getppid()))
else:
    print("我是父进程%d,我的子进程是%d" % (os.getpid(), pid))

# 我是父进程30007,我的子进程是30009
# 我是子进程30009,我的父进程是30007

方式一:

标准库模块multiprocessing提供了一个类对象Process,用于表示进程

  1. 根据类对象Process创建进程实例对象

  2. 调用进程实例对象的方法start()启动进程,调用start()后,会自动调用run(),方法run()会自动调用target指定的函数

from multiprocessing import Process, current_process

print("父进程启动(%d----%s)" % (current_process().pid, current_process().name))

def do_something(*args):
    print("子进程启动(%d----%s)" % (current_process().pid, current_process().name))
    print(args)
    print("子进程结束(%d----%s)" % (current_process().pid, current_process().name))

process = Process(target=do_something, args=(5, 10))
process.start()

import time

time.sleep(2)
print("父进程结束(%d----%s)" % (current_process().pid, current_process().name))

Process的__init__方法:

def __init__(self, group=None, target=None, name=None, args=(), kwargs={},*, daemon=None):
  • group:用于指定进程实例对象所属的进程组,默认不属于任何进程组
  • target:用于制定被run()调用的函数,默认没有函数被调用
  • name:用于制定创建进程实例对象的名称,第n个子进程的默认名称为Process-n
  • args:用于制定target接受的未知参数,用元组表示,默认不接受位置参数
  • kwargs:用于制定target接受的关键值参数,用字典表示,默认不接受关键字参数
  • daemon:守护进程

方式二:

  1. 自定义继承自Process的类对象,重写特殊方法__init__()run()
  2. 根据自定义的类对象创建进程实例对象
  3. 调用进程实例对象的方法start()启动进程,调用start()后,会自动调用重写的run()
from multiprocessing import Process, current_process

print("父进程启动(%d----%s)" % (current_process().pid, current_process().name))

class MyProcess(Process):
    def __init__(self, name, args):
        super().__init__(name=name)
        self.args = args

    def run(self):
        print("子进程启动(%d----%s)" % (current_process().pid, current_process().name))
        print(self.args)
        print("子进程结束(%d----%s)" % (current_process().pid, current_process().name))

m = MyProcess(name="MyProcess", args=(5, 10))
m.start()

import time

time.sleep(2)
print("父进程结束(%d----%s)" % (current_process().pid, current_process().name))

2.2 多进程执行的不确定性

from multiprocessing import Process, current_process

import time

def do_something():
    for i in range(4):
        print("%s:%d" % (current_process().name, i))
        time.sleep(2)

for i in range(3):
    Process(target=do_something).start()

do_something()

执行结果:

MainProcess:0
Process-1:0
Process-2:0
Process-3:0
MainProcess:1
Process-1:1
Process-2:1
Process-3:1
Process-1:2
MainProcess:2
Process-2:2
Process-3:2
Process-1:3
MainProcess:3
Process-2:3
Process-3:3

本次测试程序启动了三个子进程去执行do_something方法,之后主进程也执行该方法,通过执行结果可以看出,每个进程之间运行是没有顺序。默认情况下,多个进程的执行顺序和时间都是不确定的,完全取决于操作系统的调度

2.3 守护进程

import time
from multiprocessing import Process, current_process

print("父进程%d启动" % current_process().pid)

class MyProcess(Process):
    def run(self):
        print("子进程%d启动" % current_process().pid)
        time.sleep(2)
        print("子进程%d结束" % current_process().pid)

m = MyProcess()
# m = MyProcess(daemon=True)
m.daemon = True
m.start()

print("父进程%d结束" % current_process().pid)

>>>父进程13717启动
>>>子进程13721启动
>>>父进程13717结束

可以在调用进程实例对象方法的start()之前,将属性daemon的值设置为True,或直接通过创建对象传入关键字参数,从而将进程设置为守护进程。守护进程是为了守护父进程而存在的子进程,当父进程结束时,守护进程就没有了存在的意义,因此,守护进程会随着父进程的结束而结束

2.4 阻塞父进程的子进程方法join

比如:现在有个需求,需要在子进程的程序执行完成以后接着主进程继续运行,但是我们不知道子进程需要执行多长时间,此时就需要使用join方法。他能保证无论子进程执行多久,只有在子进程执行完毕以后,才会执行join方法之后的程序,此时方法join就处于阻塞状态。

import time
from multiprocessing import Process, current_process

print("父进程%d启动" % current_process().pid)

class MyProcess(Process):
    def run(self):
        print("子进程%d启动" % current_process().pid)
        time.sleep(2)
        print("子进程%d结束" % current_process().pid)

m = MyProcess()
m.start()
# m.join()
m.join(1)
print("父进程%d结束" % current_process().pid)

>>>父进程13921启动
>>>子进程13926启动
>>>父进程13921结束
>>>子进程13926结束

在父进程中创建并启动子进程后,可以调用子进程的方法join(),这样子进程会把父进程阻塞,父进程会等子进程执行完之后再从被阻塞的地方继续执行

在调用join()时,可以制指定参数timeout,从而指定子进程阻塞父进程的时间

2.5 全局变量多个进程不共享

from multiprocessing import Process

NUM = 18

def do_something():
    global NUM
    NUM += 1

p = Process(target=do_something)
p.start()
p.join()

# 在子进程中修改全局变量,对父进程中的全局变量没有影响
# 因为:子进程对父进程中的全局变量做了一份拷贝,子进程与父进程中的全局变量NUM是完全不同的两个变量
print(NUM)  # 18

每个进程都有独立的内存空间,从而进程间是相互独立的,因此,全局变量在多个进程之间不能共享。

2.6 多进程操作共享数据是不安全的

from multiprocessing import Process, Value

NUM = Value('i', 0)

def do_something():
    global NUM
    for i in range(10000):
        # 相当于:num = num +1
        # 首先计算num+1,存入临时变量中,然后将临时变量的值赋给NUM
        NUM.value += 1

p1 = Process(target=do_something)
p2 = Process(target=do_something)

p1.start()
p2.start()
p1.join()
p2.join()

print(NUM.value)  # 小于20000
多进程操作共享数据

由于多进程的执行是不确定的,导致多进程操作共享数据的结果是不可预期的,这也常被称为不安全的

3. 线程

在Python中,想要实现多任务除了使用进程,还可以使用线程来完成,线程是实现多任务的另外一种方式。线程是进程中执行代码的一个分支,每个执行分支(线程)要想工作执行代码需要cpu进行调度 ,也就是说线程是cpu调度的基本单位,每个进程至少都有一个线程,而这个线程就是我们通常说的主线程。

3.1 多线程使用

任何进程都会自动创建并启动一个线程,该线程被称为父(主)线程,默认名称是MainThread通过父进程创建的子进程也会自动创建一个线程

方式一:

标准库模块threading提供了一个类对象Thread,用于表示线程

  1. 根据类对象Thread创建线程实例对象

  2. 调用线程实例对象的方法start()启动线程,调用start()后,会自动调用run(),方法run()会自动调用target指定的函数

from threading import Thread, currentThread

print("父线程启动(%d----%s)" % (currentThread().ident, currentThread().name))


def do_something(*args):
    print("子线程启动(%d----%s)" % (currentThread().ident, currentThread().name))
    print(args)
    print("子线程结束(%d----%s)" % (currentThread().ident, currentThread().name))


thread = Thread(target=do_something, args=(5, 10), name="MyThread")
thread.start()

import time

time.sleep(2)
print("父线程结束(%d----%s)" % (currentThread().ident, currentThread().name))

# 执行结果:
>>>父线程启动(140450223359808----MainThread)
>>>子线程启动(140450214668032----MyThread)
>>>(5, 10)
>>>子线程结束(140450214668032----MyThread)
>>>父线程结束(140450223359808----MainThread)

Thread的__init__方法:

def __init__(self, group=None, target=None, name=None,args=(), kwargs=None, *, daemon=None):
  • group:用于指定线程实例对象所属的线程组,默认不属于任何线程组
  • target:用于制定被run()调用的函数,默认没有函数被调用
  • name:用于制定创建线程实例对象的名称,第n个子线程的默认名称为Thread-n
  • args:用于制定target接受的未知参数,用元组表示,默认不接受位置参数
  • kwargs:用于制定target接受的关键值参数,用字典表示,默认不接受关键字参数
  • daemon:用于制定线程实例对象是否是守护线程,默认不是守护线程

方式二:

  1. 自定义继承自Thread的类对象,重写特殊方法__init__()run()
  2. 根据自定义的类对象创建线程实例对象
  3. 调用线程实例对象的方法start()启动线程,调用start()后,会自动调用重写的run()
from threading import Thread, current_thread

print("父线程启动(%d----%s)" % (current_thread().ident, current_thread().name))

class MyThread(Thread):
    def __init__(self, name, args):
        super().__init__(name=name)
        self.args = args

    def run(self):
        print("子线程启动(%d----%s)" % (current_thread().ident, current_thread().name))
        print(self.args)
        print("子线程结束(%d----%s)" % (current_thread().ident, current_thread().name))

m = MyThread(name="MyThread", args=(5, 10))
m.start()

import time

time.sleep(2)
print("父线程结束(%d----%s)" % (current_thread().ident, current_thread().name))


# 执行结果:
>>>父线程启动(140450223359808----MainThread)
>>>子线程启动(140450214668032----MyThread)
>>>(5, 10)
>>>子线程结束(140450214668032----MyThread)
>>>父线程结束(140450223359808----MainThread)

3.2 多线程执行的不确定性

同多进程。默认情况下,多个线程的执行顺序和时间都是不确定的,完全取决于操作系统的调度

from threading import Thread, current_thread

import time

def do_something():
    for i in range(5):
        print("%s:%d" % (current_thread().name, i))
        time.sleep(2)

for i in range(3):
    Thread(target=do_something).start()

do_something()

执行结果:

Thread-1:0
Thread-2:0
Thread-3:0
MainThread:0
Thread-2:1
Thread-3:1
MainThread:1
Thread-1:1
Thread-2:2
Thread-3:2
MainThread:2
Thread-1:2
Thread-2:3
Thread-3:3
Thread-1:3
MainThread:3

3.3 守护线程

在创建线程实例对象时可以将参数daemon的值设置为True,从而将线程设置为守护线程,守护线程是为了守护父线程而存在的子线程,当父线程结束时,守护线程就没有了存在的意义,因此,守护线程会随着父线程的结束而结束

import time
from threading import Thread, current_thread

print("父线程%d启动" % current_thread().ident)

class MyThread(Thread):

    def run(self):
        print("子线程%d启动" % current_thread().ident)
        time.sleep(2)
        print("子线程%d结束" % current_thread().ident)

# m = MyThread(daemon=True)
m = MyThread()
m.setDaemon(True)
# m.daemon = True
m.start()

time.sleep(1)

print("父线程%d结束" % current_thread().ident)

3.4 阻塞父线程的子线程方法join

在父线程中创建并启动子线程后,可以调用子线程的方法join(),这样子线程会把父线程阻塞,父线程会等子线程执行完之后再从被阻塞的地方继续执行在调用join()时,可以制指定参数timeout,从而指定子线程阻塞父线程的时间

import time
from threading import Thread, current_thread

print("父线程%d启动" % current_thread().ident)

class MyThread(Thread):
    def run(self):
        print("子线程%d启动" % current_thread().ident)
        time.sleep(2)
        print("子线程%d结束" % current_thread().ident)

m = MyThread()
m.start()
# m.join()
m.join(1)
print("父线程%d结束" % current_thread().ident)

3.5 全局变量在进程的所有线程中可以共享

from threading import Thread

NUM = 18

def do_something():
    global NUM
    NUM += 1

t = Thread(target=do_something)
t.start()
t.join()

print(NUM)  # 19

进程内的所有线程共享内存空间,所以全局变量在进程的所有线程中可以共享

3.6 多线程操作共享数据是不安全的

from threading import Thread
import dis

NUM = 0

def do_something():
    global NUM
    for i in range(1000000):
        # 相当于:num = num +1
        # 首先计算num+1,存入临时变量中,然后将临时变量的值赋给NUM
        NUM += 1

t1 = Thread(target=do_something)
t2 = Thread(target=do_something)

t1.start()
t2.start()
t1.join()
t2.join()

print(NUM)  # 小于2000000
多线程操作共享数据

由于多线程的执行是不确定的,导致多线程操作共享数据的结果是不可预期的,这也常被称为不安全的

4. 进程池

4.1 进程池Pool

如果并发的任务数过多,一次性创建并启动大量的进程会给计算机带来很大的压力,那么就可以使用进程池对创建与启动的进程进行限制和管理进程池中能所容纳的进程数目是固定的。

标准库模块multiprocessing中提供一个类对象Pool,用于表示进程池,进程池中所能容纳的进程数目可以在创建Pool实例对象时进程指定,如果不指定,默认大小是CPU的核数

from multiprocessing import Pool
import time, random

print("父进程启动")

def do_something(i):
    print("子进程%d启动" % i)
    start = time.time()

    time.sleep(random.random() * 10)
    end = time.time()
    print("子进程%d结束,耗时%.2f秒" % (i, end - start))

p = Pool(3) # 将进程池所能容纳的最大进程数指定为3

for i in range(1, 11):
    # 与start()类似,不同的是,创建并启动有进程池管理的子进程
    p.apply_async(do_something, args=(i,))

# 调用方法join()之前,必须调用close()
# 调用close()之后就不能让进程池在管理新的进程了
p.close()

# 父进程被阻塞
# 进程池管理的所有子进程执行完之后,父进程再从被阻塞的地方继续执行
p.join()

print("父进程结束")

# 程序运行后同时创建并运行3个子进程,第四个子进程要等前面三个中的某一个执行完毕之后,才会创建并启动

4.2 ProcessPoolExecutor

在标准库模块concurrent.futures中提供了一个类对象ProcessPoolExecutor,也用于表示进程池。与Pool相比,ProcessPoolExecutor的功能和性能更加强大。

from concurrent.futures import ProcessPoolExecutor
import time, random

print("父进程启动")

def do_something(i):
    print("子进程%d启动" % i)
    start = time.time()

    time.sleep(random.random() * 10)
    end = time.time()
    print("子进程%d结束,耗时%.2f秒" % (i, end - start))

# 将进程池所能容纳的最大进程数指定为3
p = ProcessPoolExecutor(max_workers=3)

# 将需要进程池处理的任务全部交给进程池,此后会创建并启动由进程池管理的子进程
for i in range(1, 11):
    p.submit(do_something, i)

# 父进程被阻塞
# 进程池管理的所有子进程执行完之后,父进程再从被阻塞的地方继续执行
p.shutdown(wait=True)

print("父进程结束")

ProcessPoolExecutor的父类对象Executor遵守了上下文管理解析,所以可以使用with语句,这样在离开运行时上下文是会自动调用shutdown(wait=True)方法

源码 concurrent.futures._base.py 下面的类对象Executor

def __enter__(self):
    return self

def __exit__(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait=True)
    return False
with ProcessPoolExecutor(max_workers=3) as p:
    """
    for i in range(1, 11):
        p.submit(do_something, i)
    """
    # 等价于:
    p.map(do_something, range(1, 11))

4.3 ProcessPoolExecutor对象方法

  1. submit(self, fn, *args, **kwargs):

返回值是一个Future实例对象,表示子进程所调用的那个函数的执行,比如:do_something(),可以调用Futureresult()得到这个函数的返回值。方法result()是一个同步方法,直到这个函数执行完毕之后方法result()才会返回

with ProcessPoolExecutor(max_workers=5) as p:
     for i in range(1, 11):
        future = p.submit(do_something, i)
        # 同步,需要等待do_something执行完毕
        print(future.result())

p.submit(do_something, i)是立刻返回,可以通过列表保存返回的对象,之后遍历获取返回值

with ProcessPoolExecutor(max_workers=3) as p:
    obj_list = []
    for i in range(1, 11):
        future = p.submit(do_something, i)
        # 异步,无需等待do_something执行完毕
        print(future)
        obj_list.append(future)

for obj in obj_list:
    print(obj.result())
  1. wait(fs, timeout=None, return_when=ALL_COMPLETED):

该函数用于阻塞父进程,以等待指定的Future实例对象序列,直到满足条件

  • fs : 用于指定要等待的Future对象实例序列

  • timeout : 用于指定等待的最长时间,如果指定为None或不指定,则一直等待

  • return_when : 用于指定这个函数何时反复,有三种取值:

    • FIRST_COMPLETED :当第一个Future实例对象已经完成或取消时

    • FIRST_EXCEPTION :当第一个Future实例对象抛出异常时

    • ALL_COMPLETED :当所有Future实例对象已经完成或已被取消时

该函数的返回值是有两个集合组成的元组,第一个集合包含了已经完成或已被取消的所有Future实例对象,第二个集合包含了没有完成并且没有被取消的Future实例对象

from concurrent.futures import ProcessPoolExecutor, wait, as_completed, ALL_COMPLETED, FIRST_COMPLETED

import time, random

def do_something(i):
    time.sleep(random.random() * 10)
    return i * i

p = ProcessPoolExecutor(max_workers=3)
objs = []

for i in range(1, 4):
    future = p.submit(do_something, i)
    objs.append(future)
    
(done, not_done) = wait(objs, return_when=ALL_COMPLETED)
print(done)  # 已经完成的Future对象实例的集合
print(not_done)  # 未完成的Future对象实例的集合
  1. as_completed(fs, timeout=None):

该函数用于将指定的Future实例对象序列转换为一个迭代器,当序列中的任意一个Future实例对象已经完成或已被取消时都会被yield。这样,通过遍历得到的迭代器,就可以在任意一个Future实例对象已经完成或已被取消时立即做一些处理,比如:调用result()方法得到执行结果

  • fs : 用于指定Future实例对象的序列
  • timeout : 用于指定超时时间,如果指定为None或不指定,则不会超时
for i in range(1, 4):
    future = p.submit(do_something, i)
    objs.append(future)

future_iterator = as_completed(objs)
for future in future_iterator:
    print(future.done())
    print(future.cancel())
    print(future.result())

# executor.done() # 查看任务是否完成(True/False)
# executor.cancel() # 取消等待的任务,在运行中或运行完成的任务无法取消
# executor.result() # 获取任务函数返回的结果

5. 线程池

5.1 线程池ThreadPool

第三方库threadpool中提供一个类对象ThreadPool,用于表示线程池,线程池中所能容纳的线程数目可以在创建ThreadPool实例对象时线程指定,如果不指定,默认大小是CPU的核数

from threadpool import ThreadPool, makeRequests
import time, random

print("父线程启动")

def do_something(i):
    print("子线程%d启动" % i)
    start = time.time()

    time.sleep(random.random() * 10)
    end = time.time()
    print("子线程%d结束,耗时%.2f秒" % (i, end - start))

t = ThreadPool(3)

# 创建需要线程池处理的任务
requests = makeRequests(do_something, args_list=range(1, 11))

# 将需要线程池处理的任务全部交给线程池,此后会创建并启动线程由线程池管理的子线程
for req in requests:
    t.putRequest(req)

# 父线程被阻塞
# 线程池管理的所有子线程执行完之后,互相陈再从阻塞的地方继续执行
t.wait()

print("父线程结束")

# 程序运行后同时创建并运行3个子线程,第四个子线程要等前面三个中的某一个执行完毕之后,才会创建并启动

5.2 ThreadPoolExecutor

与进程池ProcessPoolExecutor用法一致

6. 同步(适用于进程和线程)

为了保证多个线程(进程)能安全的操作共享数据,必须确保一个线程(进程)在操作共享数据时,其他线程(进程)都不能操作。

6.1 互斥锁Lock

一个线程(进程)A在操作共享数据前必须先试图获得锁,从而给相关代码上锁,线程A获得锁之后,锁的状态变成为locked。如果另外一个线程(进程)B试图获得锁,线程(进程)B的状态会变成为blocked并且被添加到锁等待池,只能等待获得锁的线程(进程)A在释放锁之后,锁的状态变为unlocked,线程(进程)调度程序再从锁等待池中处于状态blocked的线程(进程)中选择一个来获得锁,获的锁之后该线程(进程)的状态变为running。由于只有一把锁,无论多少个线程(进程),同一时刻最多只有一个线程(进程)能获得该锁,这样就确保了操作共享数据的相关代码只能有一个线程(进程)从头到尾完成的执行,从而确保了多个线程(进程)操作共享数据总是安全的。但是包含锁的相关代码只能以单线程模式执行,因此效率大大降低了。

Lock
  • 多线程同步之Lock

标准库模块threading中提供了一个类对象Lock,用于表示锁,以实现多线程之间的同步简单的说:同步就意味着”阻塞和等待“。为了保证获得锁的线程(进程)用完了后一定要释放锁,可以将操作共享数据的代码放在try语句块中,把释放锁的代码放在finally语句块。由于类对象Lock遵循了上下文管理协议,所以可以使用with语句进行简化,这样,在进入运行时上下文是会自动调用方法require(),在离开运行时上下文是会自动调用release()

  • acquire(blocking=True, timeout=-1):

    • 当浮点型 timeout 参数被设置为正值调用时,只要无法获得锁,将最多阻塞 timeout 设定的秒数。
    • 当调用时参数 blocking 设置为 True (缺省值),阻塞直到锁被释放,然后将锁锁定并返回 True 。如果blocking 被设置为 False 的情况下调用,将不会发生阻塞。如果调用时已经锁定,则立即返回 False ,表示获取锁失败;否则,将锁锁定并返回 True。
  • release():

    • 释放一个锁。这个方法可以在任何线程中调用,不单指获得锁的线程。当锁被锁定,将它重置为未锁定,并返回。如果其他线程正在等待这个锁解锁而被阻塞,只允许其中一个允许。
from threading import Thread, Lock

lock = Lock() # 定义锁,默认状态是解锁
NUM = 0

def do_something():
    global NUM
    for i in range(1000000):
        """
        lock.acquire() # 上锁
        try:
            NUM += 1
        finally:
            lock.release() # 释放锁(解锁)
        """
        with lock:
            NUM += 1

t1 = Thread(target=do_something)
t2 = Thread(target=do_something)

t1.start()
t2.start()
t1.join()
t2.join()

print(NUM)  # 2000000

6.2 死锁

6.2.1 线程死锁

如果有多个共享数据,在多个线程操作这多个共享数据时,如果两个线程分别通过加锁占有一部分共享数据,并且同时等待对方释放锁,这样就会导致两个线程永远相互等待而产生死锁。要避免程序中出现死锁的情况,在避免死锁的算法中最有代表性的算法是Dijkstra提出的银行家算法

from threading import Thread, Lock, current_thread

numa = 0
numb = 0
locka = Lock()
lockb = Lock()

def do_something():
    fun1()
    fun2()

def fun1():
    global numa, numb
    locka.acquire()

    try:
        print("%s------func1()-------locka" % current_thread().ident)
        numa += 1
        lockb.acquire()
        try:
            print("%s------func1()-------lockb" % current_thread().ident)
            numb += 1
        finally:
            lockb.release()
    finally:
        locka.release()

def fun2():
    global numa, numb
    lockb.acquire()
    try:
        print("%s------func2()-------lockb" % current_thread().ident)
        numb += 1
        locka.acquire()
        try:
            print("%s------func2()-------locka" % current_thread().ident)
            numb += 1
        finally:
            locka.release()
    finally:
        lockb.release()

t_list = []
for i in range(100):
    t = Thread(target=do_something)
    t_list.append(t)
    t.start()

for item in t_list:
    item.join()

print("父线程结束")
死锁

线程一执行func1(),优先获得locka的锁,此时线程二执行func2(),优先获得lockb锁,之后方法func1()需要获取lockb锁,方法func2()需要获取locka锁,也就是各个线程都需要另外一个线程的资源,但是在获取此前他们各自都没有解锁,也就导致他们都没法上锁,始终保持僵持的状态。这一种状态就是死锁。

6.2.2 进程死锁

进程通过共享内存访问全局变量,同样可以利用上述测试代码

from multiprocessing import Process, Lock, current_process,Value

numa = Value('i',0)

numb = Value('i',0)

locka = Lock()
lockb = Lock()

......

6.3 RLock

在同一线程中,当调用了Lock的方法acquire()之后,如果在调用方法release()之前再次调用了方法acquire(),也会导致死锁

from threading import Thread, current_thread, RLock, Lock

lock = Lock()
lock.acquire()
print("获得锁")

lock.acquire()
print("获得锁")

lock.release()
print("释放锁")

lock.release()
print("释放锁")

>>>获得锁
>>>

标准库模块threading中还提供了一个用于表示锁的类对象RLock(可重入锁)。与Lock相同的是:RLock也提供了获得锁的方法acquire(),和释放锁的方法release()Lock不同的是:在同一个线程中,当调用了RLock的方法acquire()之后。可以再调用方法release()之前可以多次调用方法acquire()而不会导致死锁。

lock = RLock()

lock.acquire()
print("获得锁")
lock.acquire()
print("获得锁")
lock.release()
print("释放锁")
lock.release()
print("释放锁")

>>>获得锁
>>>获得锁
>>>释放锁
>>>释放锁

源码分析:

在RLock的内部维护了一个Lock和计数器count。count记录了锁被acquire的次数。当线程第一次调用方法acquire()获的锁是,锁的拥有者被保存,同时计数器count被初始化为1;但再次调用方法acquire()时首先会判断调用这是否是锁的拥有者。如果是,计数器count加一。RLock相当于一个门可以上多把锁,上多少锁就的开多少把锁,因此方法acquire()和release()必须成对出现,如果在某个线程中,调用了n次acquire(),必须调用n次release()才能释放该线程所占用的锁

RLock
class _RLock:

    def __init__(self):
        self._block = _allocate_lock()
        self._owner = None
        self._count = 0

    def acquire(self, blocking=True, timeout=-1):
        me = get_ident() # 获取线程id
        if self._owner == me: # 判断当前线程时候是自己,如果是第一次上锁,__owner默认是None,则不执行
            self._count += 1 # 如果线程已经上过一次锁,则计数器加一
            return 1
        rc = self._block.acquire(blocking, timeout) # 调用底层Lock锁的acquire方法,上锁成功返回True,否则阻塞
        if rc:
            self._owner = me # __owner记录为当前线程id
            self._count = 1 # 计数器设置为1
        return rc # 返回布尔值 

    __enter__ = acquire

    def release(self):
        if self._owner != get_ident(): # 判断调用释放锁的线程id是否为自己,意思就是解铃还须系铃人
            raise RuntimeError("cannot release un-acquired lock")
        self._count = count = self._count - 1 # 计数器减一
        if not count: # 计数器为0时表示解锁完成
            self._owner = None # 取消当前线程与锁的绑定
            self._block.release() # 调用底层Lock锁的release方法进行解锁

    def __exit__(self, t, v, tb):
        self.release()

6.4 Condition

标准库模块threading中提供了一个类对象Condition,用于表示带触发条件的锁,以帮助我们处理多线程间复杂的同步问题,Condition允许一个或多个线程等待触发条件,直到收到另一个线程的通知

from threading import Thread, Condition
import time

cond = Condition()


class MyThread1(Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        cond.acquire()
        print("%s:快来gank" % self.name)

        cond.wait()

        time.sleep(2)
        print("%s:怎么还不来啊,都快越塔了" % self.name)
        cond.notify()

        cond.wait()
        # 思考两秒再说
        time.sleep(2)
        print("%s:first blood" % self.name)
        cond.notify()

        cond.release()


class MyThread2(Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        cond.acquire()
        # 思考2秒
        time.sleep(2)
        print('%s:打完f6就来' % self.name)
        cond.notify()

        cond.wait()
        time.sleep(2)
        print('%s:马上,你在猥琐一下' % self.name)
        cond.notify()

        cond.wait()
        time.sleep(2)
        print('%s:我去,你好坑啊' % self.name)

        cond.release()


MyThread1("中单").start()
MyThread2("打野").start()


>>>中单:快来gank
>>>打野:打完f6就来
>>>中单:怎么还不来啊,都快越塔了
>>>打野:马上,你在猥琐一下
>>>中单:first blood
>>>打野:我去,你好坑啊

分析:
分别定义两个线程,为Thread1和Thread2,线程启动之后两个线程争先上锁(此时的上锁是必须的,默认的锁是RLock锁)。假如线程1抢到锁,之后可以执行一部分逻辑(打印,中单:快来gank),而线程2则处于阻塞状态,当线程1调用wait时,底层会释放一级锁(RLock),并且保存线程的状态,然后创建一个二级锁(Lock锁),并且执行acquire上锁,还把该锁存入到了一个双端队列,之后再次调用acquire阻塞自身。这个时候线程1释放了RLock,则线程2就可以上锁,同样也执行一部分逻辑,之后需要调用notify发起通知(不能调用wait,会引起死锁),默认只发起一个通知,从队列中拿出队头的二级锁,进行解锁,之后从队列中移除该二级锁。解锁之后,线程1的二级锁就可以上锁,之后便恢复线程的状态,线程2处理完逻辑(打野:打完f6就来),调用wait阻塞等待,此时的线程1恢复了,就从wait被唤醒,执行逻辑(中单:怎么还不来啊,都快越塔了),想要线程2执行就必须调用notify通知。

Condition

源码分析:

class Condition:
    # 条件变量允许一个或多个线程进入到等待状态,直到它们被其他线程唤醒

    def __init__(self, lock=None):
        # 如果传入的lock为空则默认初始化一个RLock实例
        if lock is None:
            lock = RLock()
        self._lock = lock

        # 将Condition的acquire()和release()方法设置为RLock的acquire()和release()方法
        self.acquire = lock.acquire 
        self.release = lock.release 
        
        # 三个方法的代理;因为Lock没有实现这三个方法,就会默认使用Condition的方法
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass

        # 用双端队列记录一个waiting池
        self._waiters = _deque() 

    def __enter__(self):
        return self._lock.__enter__() # 调用可重入锁的__enter__()方法,本质是调用Lock的acquire方法

    def __exit__(self, *args):
        return self._lock.__exit__(*args) # 调用可重入锁的__exit__()方法,本质是调用Lock的release方法
    
    """
    对于RLock而言,调用release()方法,并不一定会真正的释放锁。
    因此,它提供了_release_save()方法,该方法会真正的释放锁,
    并且将RLock内部维护的状态,返回给调用方。
    之后,线程再次获取到底层锁之后,再将状态重置回去
    RLock内部会维护两个状态:owner-拥有锁的线程的id,count-该线程获取了多少次锁
    """
    # 只有底层锁没有_release_save()和_acquire_restore()方法时,才用下面的实现
    def _release_save(self):
        self._lock.release()           # No state to save

    def _acquire_restore(self, x):
        self._lock.acquire()           # Ignore saved state

    def _is_owned(self):
        """
        如果当前线程持有锁,则返回True,否则返回False,其过程是:
      - 使用非阻塞的方式获取锁,如果获取成功,则表明当前线程,没有锁,那么释放刚刚获取到的锁,并返回False;
          - 否则,认为当前线程持有锁,返回True。
        """
        
        # 只有当底层锁没有_is_owned()方法时,才会用这种方式判断当前线程是否拥有底层锁
        # 因为RLock具有_is_owned()方法,所以,它的对象不会使用这里的_is_owned()方法
        if self._lock.acquire(0): 
            self._lock.release()
            return False # 
        else:
            return True

    def wait(self, timeout=None):
        # 1,如果当前线程,没有上锁,那么抛出异常
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")

        # 2,设置一个二级锁(Lock),并且为它上锁,然后放入队列。注意这里的上锁操作,利用Lock的特性,下面就会上演如何轻松实现主动阻塞。
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)

        # 3,释放底层锁,并保存锁对象的内部状态
        # 释放一级锁。这里可以看下Rlock的实现,因为Rlock的计数器可能大于1的,这里提供了私人的方法一步到位的将 self._owner 、 self._count 提取出来。
        saved_state = self._release_save()

     
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                # 3.1,如果timeout是None,那么再次以阻塞的方式获取二级锁(Lock)对象
                #  - 因此当前线程已经获取了一次该锁,因此当前线程会阻塞,直到其他线程释放该锁
                # 再次对二级锁进行上锁操作,很轻巧的实现自我阻塞,等待其他线程调用 notify() 进行唤醒。
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
                    # 3.2,如果设置了timeout为正数,阻塞等待设置的时间获取锁,获取到返回True,否则返回False
                    gotit = waiter.acquire(True, timeout)
                else:
                    # 3.3,如果设置了timeout为负数,则以非阻塞的方式获取,没有获取到直接返回False
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)  # 被唤醒后,重新获得对一级锁的拥有。将saved_state数据进行恢复。
            if not gotit:
                try:
                    self._waiters.remove(waiter) 
                except ValueError:
                    pass

    def wait_for(self, predicate, timeout=None):
        endtime = None
        waittime = timeout
        result = predicate()
        while not result:
            if waittime is not None:
                if endtime is None:
                    endtime = _time() + waittime
                else:
                    waittime = endtime - _time()
                    if waittime <= 0:
                        break
            self.wait(waittime)
            result = predicate()
        return result

    def notify(self, n=1):
        # 1,如果当前线程,没有上锁,那么抛出异常
        if not self._is_owned(): # 判断
            raise RuntimeError("cannot notify on un-acquired lock")
        
        # 备份当前线程队列;定义新得队列,将前n个线程转移到waiters_to_notify
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))

        # 队列为空直接返回
        if not waiters_to_notify:
            return
        
        # 队列不为空,循环释放二级锁,wait函数中的wait.acquire()唤醒;最后尝试将该锁踢出队列
        for waiter in waiters_to_notify:
            waiter.release()
            try:
                all_waiters.remove(waiter)
            except ValueError:
                pass

    def notify_all(self):
        self.notify(len(self._waiters))

    notifyAll = notify_all

总结:
在Condition中有两把锁,默认一把RLock和一把Lock。结合方法理解两把锁的作用:

  • wait():调用首先判断当前线程是否上锁,此时的锁是RLock锁,接着通过_allocate_lock()获取一把Lock锁并且上锁加入到双端队列中,之后调用_release_save把一级锁(RLock锁)释放掉返回值得到当前锁重入的次数(count)和线程标识(owner),然后接着再次对二级锁(Lock)进行上锁操作,由于Lock无法重入,也就意味着上一次如果上锁成功,此时上锁将处于阻塞状态,直到其他线程执行notify从队列中释放第一次上的二级锁,此时上锁就可以成功,线程被唤醒,最后通过_acquire_restore尝试获取一级锁(RLock),恢复之前保存的状态saved_state
  • notify:调用首先判断当前线程是否上锁,此时的锁是RLock锁,备份当前线程二级锁(Lock)队列;定义新得队列,将前n个二级锁转移到新队列,如果队列为空直接返回,队列不为空,循环释放二级锁,默认只取队头,最后尝试将该锁踢出队列。

总结一句话:wait分为上下两部分,第一部分为释放RLock锁,获取Lock锁并加入队列,第二部分为获取Lock锁阻塞自身,等待唤醒,唤醒成功恢复RLock锁状态,即:问,你在干嘛?等待回复。notify是从队列中释放Lock锁。即:答,我没在干嘛。

6.5 生产者消费者(多线程Condition)

生产者消费者

生产者消费者问题:假设和有一群生产者(Producer)和一群消费者(Consumer)通过一个市场来交换产品。

  • 生产者策略:如果市场剩余产品小于20个,那么就生产4个产品放到市场
  • 消费者策略:如果市场上剩余产品多于10个,那么就从市场上消费3个产品

Condition就比较适合等待与回复的场景,而生产者消费者模式就是,我生产够了,就通知(notify)你消费,我消费快没了,就等待(wait)你生产。

from threading import Thread, Condition
import time

cond = Condition()
count = 0

class Producer(Thread):
    def run(self):
        global count, cond
        while True:
            cond.acquire()
            if count < 20:
                count += 4
                print("%s:生产者生产了4个,当前总共%d个" % (self.name, count))
                cond.notify()
            else:
                print("%s:不生产,等待" % self.name)
                cond.wait()
            cond.release()
            time.sleep(2)

class Consumer(Thread):
    def run(self):
        global count, cond
        while True:
            cond.acquire()
            if count > 10:
                count -= 3
                print("%s:消费者消费了3个,当前总共%d个" % (self.name, count))
                cond.notify()
            else:
                print("%s:不消费,等待" % self.name)
                cond.wait()
            cond.release()
            time.sleep(2)

for i in range(3):
    Producer().start()

for i in range(3):
    Consumer().start()

6.6 Semaphore(信号量)

标准库模块threading中提供了类对象Semaphore,用于表示信号量,他可以帮助我们控制并发线程的最大数量从而实现多线程之间的同步

from threading import Semaphore, Thread
import time, random

sem = Semaphore(3)

class MyThread(Thread):
    def run(self):
        """
        sem.acquire()
        print("%s获得资源" % self.name)
        time.sleep(random.random() * 10)
        sem.release()
        """
        with sem:
            print("%s获得资源" % self.name)
            time.sleep(random.random() * 10)

for i in range(10):
    MyThread().start()

源码分析:

class Semaphore:

    def __init__(self, value=1):
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._cond = Condition(Lock()) # 定义条件变量,传入Lock锁
        self._value = value # 计数器

    def acquire(self, blocking=True, timeout=None):

        # 非阻塞时timeout必须有值,保证逻辑的正确;
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None

        # 本质是调用Condition的__enter__方法,最终是调用的Lock的acquire方法,上锁
        with self._cond:
            # 循环判断计数器是否等于0,等于0表示目前已经有最大数量的线程在执行,而该线程将被添加到信号量等待池
            while self._value == 0:
                if not blocking:
                    break
                if timeout is not None:
                    if endtime is None:
                        endtime = _time() + timeout
                    else:
                        timeout = endtime - _time()
                        if timeout <= 0:
                            break
                # 添加到等待池,处于阻塞
                self._cond.wait(timeout)
            else:
                # 每上锁成功一次,对资源计数器减一
                self._value -= 1
                rc = True
        return rc

    __enter__ = acquire

    def release(self):
        # 对资源释放,计数器加一且唤醒一个等待获取资源的线程
        with self._cond:
            self._value += 1
            self._cond.notify()

    def __exit__(self, t, v, tb):
        self.release()
Semaphore

举个例子说明一下:
上图中的左侧就把它想象成停车场里面已经使用的车位,中间的信号量计数器就是停车场看门的大爷,右侧信号量等待池就是停车场外面等待停车的用户,刚开始定义信号量对象就是初始化一个具有n个车位的停车场,意思就是首次并发为n,就是一次先有n辆车可以进入停车场,之后看门大爷就把停车场的门关了(即计数器为0了),外面的车就等着什么时候停车场的车出来了一辆车(计数器加1),看门大爷一看,出来了一辆车,哦吼一嗓子,要停车的快来,外面的用户一听,感觉瞬间被唤醒,就去抢,某个用户抢到之后大爷把他放进去,然后又把门给关了(计数器减一),邪魅的一笑,你们就等着吧!!!

6.7 Event

类对象Event也可以实现线程间同步,Event实例对象管理着一个内部标志,通过改变这个内部标志的值可以让一个线程给其他处于阻塞状态的线程发送一个事件信号。从而唤醒这些线程让他们转换为运行好状态

  1. set(self):将内部标志设置为True
  2. is_set(self):将判断内部标志是否给设置为True
  3. clear(self):将内部标志设置为False
  4. wait(self,timeout=None):当内部标志为False时,调用该方法的线程会阻塞。直到另外一个线程调用了set()将内部标志设置为True,被阻塞的线程才会转为运行状态。
from threading import Thread, Event, current_thread

import time

event = Event()
print(event.is_set())  # False

def do_something():
    print("%s开始等待" % current_thread().name)
    event.wait()
    print("%s结束等待" % current_thread().name)

for i in range(3):
    Thread(target=do_something).start()

time.sleep(2)
event.set()

源码分析:

class Event:
    def __init__(self):
        self._cond = Condition(Lock()) # 定义一个条件变量
        self._flag = False # 一个标识符

    def is_set(self):
        return self._flag

    def set(self):
        # 对所有阻塞的线程进行唤醒;
        with self._cond:
            self._flag = True
            self._cond.notify_all()

    def clear(self):
        with self._cond:
            self._flag = False

    def wait(self, timeout=None):
        # 在标记符为False时,上锁,进入阻塞,等待被事件唤醒。
        with self._cond:
            signaled = self._flag
            if not signaled:
                signaled = self._cond.wait(timeout)
            return signaled

7. 进程间通信

7.1 共享内存

如果想要实现进程之间的通信,共享内存是常见的实现方式之一,它允许多个进程直接访问同一块内存

共享内存

共享内存中的对象的类型必须是ctypes的,ctypes是与C语言兼容的数据类型。为了在共享内存中创建ctypes类型的对象,标准库模块multiprocessing提供了以下两个类:

(1) Value(typecode_or_type,*args,**kwargs):

  • typecode_or_type : 用于指定数值的类型码或ctypes类型

(2) Array(typecode_or_type,size_or_initializer,lock=True):    

  • typecode_or_type : 用于指定数值的类型码或ctypes类型
  • size_or_initializer : 用于指定数组的长度或Python中的序列
from multiprocessing import Value, Array, Process
import ctypes

# 在共享内存中创建一个表示数值的ctypes对象
num = Value('d', 2.3)
# num = Value(ctypes.c_float,2.3)

# 在共享内存中创建一个表示数组的ctypes对象
arr = Array('i', range(1, 5))

# arr = Array(ctypes.c_int, range(1, 5))

def do_something():
    num.value = 1.8
    for i in range(len(arr)):
        arr[i] = -arr[i]

p = Process(target=do_something)
p.start()
p.join()

print(num.value)  # 1.8
print(arr[:])  # [-1, -2, -3, -4]

7.2 管道

管道

函数Pipe(),其返回值是一个元组,元组中包含两个对象,分别表示管道两端的连接。

调用Pipe()时,如果不传入参数或传入的参数为True,管道的工作对象为双向如果传入的参数为False,管道的工作方式为单向,其中对于返回的元组,第一个连接对象只能接受数据,第二个连接对象只能发送数据对于管道两端的连接对象,主要有两个方法:

  1. send(self,obj):用于将参数obj指定的对象发送到管道
  2. recv(self):用于从管道接收对象
from multiprocessing import Pipe

conn1, conn2 = Pipe()  # 分别表示两端的管道连接,参数为False表示单向,为True表示为双向(默认)

conn1.send('conn1发送数据1')
conn1.send('conn1发送数据2')

conn2.send('conn2发送数据1') 
conn2.send('conn2发送数据2')

print(conn1.recv()) # conn2发送数据1
print(conn1.recv()) # conn2发送数据2
print(conn2.recv()) # conn1发送数据1
print(conn2.recv()) # conn1发送数据2

c1, c2 = Pipe(False)

# c1.send("c1")  # OSError: connection is read-only c1只能接收,不能发送
# print(c2.recv())

c2.send("c2")
print(c1.recv()) # c2

7.3 Manager

如果想要实现进程之间的通信,Manager也是常见的实现方式之一与共享内存相比,Manager更加灵活,因为它可以支持多种对象类型,此外,Manager还可以通过网络被不同计算机上的进程所共享。但是Manager的速度要比共享内存慢

from multiprocessing import Manager, Process

def func():
    d[1] = 18
    d['2'] = 56
    l.reverse()

manager = Manager()

# 通过manager创建了一个同于进程通信的字典
d = manager.dict()

# 通过manager创建了一个同于进程通信的列表
l = manager.list(range(5))
p = Process(target=func) # 子进程执行

p.start()
p.join()

# 主进程查看数据时候被修改
print(d) # {1: 18, '2': 56}
print(l) # [4, 3, 2, 1, 0]

7.4 Queue

实现进程间通信还可以通过multiprocessing模块提供的Queue类对象,多个线程可以通过put方法向队列存放数据,方法get可以从队列获取数据,如果队列为空时get方法将会阻塞。

from multiprocessing import Queue, Process
import time

queue = Queue()

def get_data():
    for i in range(10):
        queue.put(i)
        time.sleep(1)
        print("put:", i)


def put_data():
    while True:
        time.sleep(2)
        if queue.empty():
            break
        data = queue.get()
        print("get:", data)

Process(target=get_data).start()
Process(target=put_data).start()

8. 定时器线程

如果想要在指定的时间片段之后再启动子线程,可以使用标准库模块threading提供的类对象Timer,用于表示定时器线程,Timer是Thread的子类,也可以通过方法start()来启动线程。定时器只执行一次。如果需要每个一段时间执行一次,则需要在子线程调用的函数内部再次创建启动子线程

from threading import Timer

def do_something():
    print("do something")
    
timer = Timer(2, do_something)
timer.start()

timer.cancel()  # 取消定时器

9. ThreadLocal

原文地址:https://www.cnblogs.com/ydongy/p/13194895.html