python 学习笔记八 进程和线程 (进阶篇)

什么是线程(thread)?

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

import threading,time
#定义线程启动后,调用函数
def run(num):
    print("threading...[%s]" % num)
    time.sleep(1)

for i in range(10):
    #方式一
    t = threading.Thread(target=run, args=(i,))#实例化线程,运行run函数,传入参数i
    t.start()#启动线程
#方式二,需要自己先定义一个类,继承threading.Thread class mythread(threading.Thread): def __init__(self,num):
     #经典类继承写法 threading.Thread.__init__(self) self.num
= num def run(self):#必须有一个叫run的方法 print("running on number:%s" % self.num) time.sleep(3) if __name__ == "__main__": t1 = mythread(1) t2 = mythread(2) t1.start() t2.start() print(t1.getName()) #获取线程名 print(t2.getName()) #获取线程名

上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。

更多方法:

  • start             线程准备就绪,等待CPU调度
  • setName        为线程设置名称
  • getName        获取线程名称
  • setDaemon    设置为后台线程或前台线程(默认)
  •                     如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
  •                     如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
  • join               逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
  • run               线程被cpu调度后执行Thread类对象的run方法

join() 和  setDaemon()

#join方法

import threading import time def sayhi(num): print(
"running on number:%s" % num) time.sleep(1) print("waitting over") if __name__ == "__main__": t_list = []
#开启多个线程
for i in range(10): t = threading.Thread(target=sayhi,args=[i,])#参数要是元组或列表注意加逗号 t.start() t_list.append(t) for j in t_list: j.join() #阻塞子线程,等待所有子线程执行完毕,主线程再运行 print("-------done--------")
def run(n):
    print('[%s]------running----
' % n)
    time.sleep(2)
    print('--done--')

def main():
    t_list = []
    for i in range(5):
        t = threading.Thread(target=run,args=[i,])
        t.start()
        print('starting thread', t.getName())
        t_list.append(t)
    for i in t_list:
        i.join()

m = threading.Thread(target=main,args=[])
m.setDaemon(True) #将主线程设置为Daemon线程,它退出时,其它子线程会同时退出,不管是否执行完任务
m.start()
m.join(1)  # 主线程等待1秒,就自动结束并杀死自线程,如果join不加等待时间,t.join()就是一直等待,一直到子线程结束。
print("---main thread done----")

join()与setDaemon()都是等待子线程结束,有什么区别呢:
当执行join()后主线程就停了,直到子线程完成后才开始接着主线程执行,整个程序是线性的。
setDaemon() 为前台线程时,所有的线程都在同时运行,主线程也在运行。只不过是主线程运行完以后等待所有子线程结束。这个还是一个并行的执行,执行效率肯定要高于join()方法的,但是如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止。

Python GIL(全局解释器锁Global Interpreter Lock)

我们知道多进程(mutilprocess) 和 多线程(threading)的目的是用来被多颗CPU进行访问, 提高程序的执行效率。 但是在python内部存在一种机制(GIL),无论你启多少个线程,你有多少个cpu, Python在执行的时候在同一时刻只允许一个线程运行。。。python调用的线程并不是由python自己生成而是由C提供的线程API,为了防止原生线程互相冲突,全局解释器允许在同一时刻只有一个原生线程在执行。

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。 就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython 就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。

线程锁(互斥锁Mutex)

由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。所以,可能出现如下问题

#线程锁(互斥锁Mutex)
#一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,
# 此时,如果2个线程同时要修改同一份数据,会出现什么状况?python3.x版本貌似已经默认加锁。。。不会出现抢占
#最后结果为0

import time
import threading
 
def addNum():
    global num #在每个线程中都获取这个全局变量
    print('--get num:',num )
    time.sleep(1)
    num  -=1 #对此公共变量进行-1操作
 
num = 100  #设定一个共享变量
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
 
for t in thread_list: #等待所有线程执行完毕
    t.join()
 
 
print('final num:', num )

正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程 运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。

添加互斥锁Mutex

import time
import threading
 
def addNum():
    global num #在每个线程中都获取这个全局变量
    print('--get num:',num )
    time.sleep(1)
    lock.acquire() #修改数据前加锁
    num  -=1 #对此公共变量进行-1操作
    lock.release() #修改后释放
 
num = 100  #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
 
for t in thread_list: #等待所有线程执行完毕
    t.join()
 
print('final num:', num )

RLock

RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。

可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。

#递归锁 说白了就是在一个大锁中还要再包含子锁,可以直接使用递归锁不用互斥锁
#用于执行一系列连贯的操作 

import threading,time

def run1():
    #操作一
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num

def run2():
    #操作二
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2

def run3():
    #同时操作一和操作二和其他附加动作
    lock.acquire()
    res = run1()
    print("----between run1 and run2----")
    res2 = run2()
    lock.release()
    print(res, res2)

if __name__ == "__main__":
    num, num2 = 0, 0
    lock = threading.RLock() #递归锁
    t_list = []
    for i in range(10):
        t = threading.Thread(target=run3)
        t_list.append(t)
        t.start()
    for i in t_list:
        i.join()
#表示当前还有子线程未运行完毕
while threading.active_count() != 1: #查看当前线程存活数 print(threading.active_count()) else: print("----all threads done----") print(num, num2)

Semaphore(信号量)

Semaphore管理一个内置的计数器,每当调用acquire()时-1,调用release() 时+1。计数器不能小于0;当计数器为0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。基于这个特点,Semaphore经常用来同步一些有“访客上限”的对象,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

#Semaphore(信号量)
#互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,类似于一个线程池。

import threading,time
def run(n): semaphore.acquire() time.sleep(
1) print("run the thread:%s " %n) semaphore.release() if __name__ == "__main__": num = 0 semaphore = threading.BoundedSemaphore(5)#最多允许5个线程同时运行 for i in range(20): t = threading.Thread(target=run, args=[i, ]) t.start() while threading.active_count() != 1: pass else: print("----all threads done----") print(num)

 Python提供了Event对象用于线程间通信,它是由线程设置的信号标志,如果信号标志位真,则其他线程等待直到信号解除。

  • 设置信号 set()

  使用Event的set()方法可以设置Event对象内部的信号标志为真。
  Event对象提供了isSet()方法来判断其内部信号标志的状态。
  当使用event对象的set()方法后,isSet()方法返回真

  • 清除信号 clear()

  使用Event对象的clear()方法可以清除Event对象内部的信号标志,即将其设为假,
  当使用Event的clear方法后,isSet()方法返回假

  • 等待 wait()

  Event对象wait()的方法只有在内部信号为True的时候才会很快的执行并完成返回。
  当Event对象的内部信号标志位False时,则wait()方法一直等待到其为真时才返回。

def light():
    if not event.isSet():# 判断其内部信号标志的状态
        event.set()  # wait就不阻塞,绿灯状态 Flag = True
    count = 0
    while True:
        if count < 10:
            print("33[42;1m----green light on ----33[0m")
        elif count < 13:
            print("33[43;1m----yellow light on ----33[0m")
        elif count < 20:
            if event.isSet():
                event.clear()  # Flag = False
            print("33[41;1m---red light on ----33[0m")
        else:
            count = 0
            event.set()# 打开绿灯
        time.sleep(1)
        count +=1

def car(n):
    while True:
        time.sleep(1)
        if event.isSet(): #判断其内部信号标志的状态
            print("car [%s] is running." % n)
        else:
            print("cat [%s] is waiting for the red light." % n)
            event.wait()

if __name__ == "__main__":
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for i in range(3):
        t = threading.Thread(target=car,args=(i,))
        t.start()

什么是进程 ?

内存中的一个独立的句柄,我们可以理解为一个应用程序在内存中就是一个进程。 各个进程之间是内存相互独立,不可共享的.

import time,os

def f(name):
    time.sleep(1)
    print("hello",name)

if __name__ == "__main__":
    p = Process(target=f,args=("xixi",))#进程和线程的创建方法没有什么太大区别
    p.start()
    p.join()

注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。

from multiprocessing import Process    #需要从multiprocessing导入
import time,os def info(title): print(title) print(
"module name:", __name__) print("parent process:", os.getppid()) print("process id:", os.getpid()) print(" ") def fuck(name): info("33[31;1mfunction fuck33[0m") print("hello", name) if __name__ == "__main__": info("33[32;1mmain process line33[0m") p = Process(target=fuck,args=("hehe",)) #生成进程实例,target调哟个fuck函数,参数为"hehe" p.start() #启动进程 p.join() #设置主进程阻塞,等子进程执行完毕在执行

进程间通讯  

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

Manager

from  multiprocessing import Process, Queue, Pipe, Manager

#Manager
# A manager returned by Manager() will support types list, dict, Namespace,
# Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

def f(d,l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.append(5)
    print(l)

if __name__ == "__main__":
#定义一个manager对象 with Manager()
as manager:
     #dict,list需要通过manager生成 d
= manager.dict() l = manager.list([1, 2, 3, 4, 5]) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
#可以实现多个进程往一个列表追加数据,实现数据共享的效果。
"""
[1, 2, 3, 4, 5, 5]
[1, 2, 3, 4, 5, 5, 5]
[1, 2, 3, 4, 5, 5, 5, 5]
[1, 2, 3, 4, 5, 5, 5, 5, 5]
[1, 2, 3, 4, 5, 5, 5, 5, 5, 5]
{'2': 2, 1: '1', 0.25: None}
[1, 2, 3, 4, 5, 5, 5, 5, 5, 5]
"""

Pipe

from multiprocessing import Process,Pipe
def send(conn):
    conn.send([42, None, "koka"])
    conn.close()

if __name__ == "__main__":
    parent_conn,son_conn = Pipe()
    p = Process(target=send,args=(son_conn,))
    p.start()
    print(parent_conn.recv()) # [42, None, 'koka']
    p.join()

Queue

from multiprocessing import Process,Queue #此队列是multiprocessing封装过的对列
def f(q):
   #global q #此处定义全局变量,子进程是无法获取到队列的,必须传入q q.put([
42, None, "koka"]) if __name__ == "__main__": q = Queue() p = Process(target=f,args=(q,)) p2 = Process(target=f,args=(q,)) p.start() p2.start() print(q.get()) print(q.get()) p.join() """result: [42, None, 'koka'] [42, None, 'koka'] """

进程锁

进程间通过上述方式通信后存在数据抢占的问题,可以使用进程锁。

from multiprocessing import Process, Lock
 
def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
 
if __name__ == '__main__':
    lock = Lock()
 
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply       子进程们将一个一个的执行,后一个子进程的执行永远以前一个子进程的结束为信号,开始执行。
  • apply_async    就是子进程接收到请求之后就各自去执行了
from  multiprocessing import Process,Pool,freeze_support
import time

def Foo(i):
    time.sleep(2)
    return i+100

def Bar(arg):
    print(arg)

if __name__ == '__main__':
    freeze_support()
    pool = Pool(5)
    #print(pool.apply(Foo,(1,)))
    #print(pool.apply_async(func=Foo, args=(1,)).get())
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,),callback=Bar)
        #首先执行Foo函数,callback 会将Foo执行完后的返回值接收作为参数传入Bar函数

    print('end')
    pool.close()
pool.terminate() #结束工作进程,不在处理未完成的任务
pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

close方法:说关闭进程池,至此,进程池中不在有进程可以接受任务。

terminate和join是一对方法,表示的内容截然相反,执行terminate是结束当前进程池中的所有进程,不管值没执行完。

join方法是阻塞主进程,等待子进程执行完毕,再继续执行主进程。需要注意的是:这两个方法都必须在close方法之后执行。

当然我们也可以不执行这两 个方法,那么子进程和主进程就各自执行各自的,无论执行到哪里,子进程会随着主进程的结束而结束。。。

原文地址:https://www.cnblogs.com/koka24/p/5262154.html