Python中的进程和线程

目录:

  • 一、概念
  • 二、多线程
  • 三、多进程
  • 四、线程锁(同步锁、互斥锁Mutex)
  • 五、线程死锁和递归锁Rlock
  • 六、条件变量同步
  • 七、同步条件(Event)
  • 八、信号量Semaphore
  • 九、队列queue

一、概念

进程就是操作系统中执行的一个程序,操作系统以进程为单位分配存储空间,每个进程都有自己的地址空间、数据栈以及其他用于跟踪进程执行的辅助数据,操作系统管理所有进程的执行,为它们合理的分配资源。进程可以通过fork或spawn的方式来创建新的进程来执行其他的任务,不过新的进程也有自己独立的内存空间,因此必须通过进程间通信机制(IPC,Inter-Process Communication)来实现数据共享,具体的方式包括管道、信号、套接字、共享内存区等。

一个进程还可以拥有多个并发的执行线索,简单的说就是拥有多个可以获得CPU调度的执行单元,这就是所谓的线程。由于线程在同一个进程下,它们可以共享相同的上下文,因此相对于进程而言,线程间的信息共享和通信更加容易。当然在单核CPU系统中,真正的并发是不可能的,因为在某个时刻能够获得CPU的只有唯一的一个线程,多个线程共享了CPU的执行时间。使用多线程实现并发编程为程序带来的好处是不言而喻的,最主要的体现在提升程序的性能和改善用户体验,今天我们使用的软件几乎都用到了多线程技术,这一点可以利用系统自带的进程监控工具(如macOS中的“活动监视器”、Windows中的“任务管理器”)来证实,如下图所示。

当然多线程也并不是没有坏处,站在其他进程的角度,多线程的程序对其他程序并不友好,因为它占用了更多的CPU执行时间,导致其他程序无法获得足够的CPU执行时间;另一方面,站在开发者的角度,编写和调试多线程的程序都对开发者有较高的要求。

Python既支持多进程又支持多线程,因此使用Python实现并发编程主要有3种方式:多进程、多线程、多进程+多线程。

(引:https://github.com/jackfrued/Python-100-Days/blob/master/Day01-15/Day13/%E8%BF%9B%E7%A8%8B%E5%92%8C%E7%BA%BF%E7%A8%8B.md

二、多线程

在Python早期的版本中就引入了thread模块(现在名为_thread)来实现多线程编程,然而该模块过于底层,而且很多功能都没有提供,因此目前的多线程开发我们推荐使用threading模块,该模块对多线程编程提供了更好的面向对象的封装。

两种实现方式:

直接调用:

# from  time import  time,sleep
# from threading import Thread
#
# def  THR1():
#     start1 = time()
#     print('开始时间1:',start1)
#     sleep(3)
#     end1 = time()
#     print('花费时间1:',(end1-start1))

# def main():
#     t1 = Thread(target=THR1,args=('第一个线程',))  #创建一个对象
#     t1.start()  #对象开始执行
#     t1.join()  #等待子线程运行结束
# if __name__ == '__main__':
    # main()

#
# from  time import  time,sleep
# from threading import Thread
#
# def  THR1():
#     start1 = time()
#     print('开始时间1:',start1)
#     sleep(3)
#     end1 = time()
#     print('花费时间1:',(end1-start1))

# if __name__ == '__main__':=
#         t1 = Thread(target=THR1, args=())
#         t1.start()
#         t1.join()
#         print('花费时间:%s' % (end - start))

继承调用:

# import threading
#
# class MyThread(threading.Thread):   #创建一个类,继承threading.Thread
#     def run(self):  # 定义每个线程要运行的函数
#         print("running on number:1")
#
# if __name__ == '__main__':
#     t1 = MyThread()   #创建一个继承类的对象
#     t1.start()   #运行

普通调用和线程使用对比

# **************普通方法****************
# from  time import  time,sleep
# def  THR1():
#     global start1
#     start1 = time()
#     print('开始时间1:',start1)
#     sleep(3)
#     end1 = time()
#     print('花费时间1:',(end1-start1))
# 
# def THR2():
#     start2 = time()
#     print('开始时间2:',start2)
#     sleep(5)
#     global end2
#     end2 = time()
#     print('花费时间2:',(end2-start2))
# 
# THR1()
# THR2()
# print('总花费时间:',(end2 - start1))
# **************运行结果***************
# 开始时间1: 1559026606.3525321
# 花费时间1: 3.000278949737549
# 开始时间2: 1559026609.352811
# 花费时间2: 5.000133037567139
# 总花费时间: 8.000411987304688
# **************使用线程的方式******************
# from  time import  time,sleep
# from threading import Thread
#
# def  THR1():
#     start1 = time()
#     print('开始时间1:',start1)
#     sleep(3)
#     end1 = time()
#     print('花费时间1:',(end1-start1))
#
# def THR2():
#     start2 = time()
#     print('开始时间2:',start2)
#     sleep(5)
#     end2 = time()
#     print('花费时间2:',(end2-start2))
#
# def main():
#     start = time()
#     t1 = Thread(target=THR1,args=())
#     t2 = Thread(target=THR2,args=())
#     t1.start()
#     t2.start()
#     t1.join()
#     t2.join()
#     end = time()
#     print('总花费时间:%s'%(end-start))
# if __name__ == '__main__':
#     main()
# *****************运行解果****************
# 开始时间1: 1559026179.1514442
# 开始时间2: 1559026179.1514442
# 花费时间1: 3.001293659210205
# 花费时间2: 5.001147270202637
# 总花费时间:5.002148151397705
普通方法和使用线程对比

关键字join和setDamon(True)用法:

知识点一:

当一个进程启动之后,会默认产生一个主线程,因为线程是程序执行流的最小单元,当设置多线程时,主线程会创建多个子线程,在python中,默认情况下(其实就是setDaemon(False)),主线程执行完自己的任务以后,就退出了,此时子线程会继续执行自己的任务,直到自己的任务结束。

知识点二:

当我们使用setDaemon(True)方法,设置子线程为守护线程时,主线程一旦执行结束,则全部线程全部被终止执行,可能出现的情况就是,子线程的任务还没有完全执行结束,就被迫停止。

知识点三:

此时join的作用就凸显出来了,join所完成的工作就是线程同步,即主线程任务结束之后,进入阻塞状态,一直等待其他的子线程执行结束之后,主线程在终止。

知识点四:join有一个timeout参数:

1.当设置守护线程时,含义是主线程对于子线程等待timeout的时间将会杀死该子线程,最后退出程序。所以说,如果有10个子线程,全部的等待时间就是每个timeout的累加和。简单的来说,就是给每个子线程一个timeout的时间,让他去执行,时间一到,不管任务有没有完成,直接杀死。

2.没有设置守护线程时,主线程将会等待timeout的累加和这样的一段时间,时间一到,主线程结束,但是并没有杀死子线程,子线程依然可以继续执行,直到子线程全部结束,程序退出。

#
# from  time import  time,sleep
# from threading import Thread
#
# def  THR1():
#     start1 = time()
#     print('开始时间1:',start1)
#     sleep(8)
#     end1 = time()
#     print('花费时间1:',(end1-start1))
#
#
# if __name__ == '__main__':
#         t1 = Thread(target=THR1, args=())
#         t1.setDaemon(True)  #以执行主线程为主,不依赖子线程执行结果
#         t1.start()
#        # t1.join()  # 等待t1(子线程)执行结束
#         print('setDaemon(True)效果')

# 运行结果:开始时间1:setDaemon(True)效果 1559018364.6895585

Thread的其他中方法:

thread 模块提供的其他方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
# 除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
# run(): 用以表示线程活动的方法。
# start():启动线程活动。
# join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。

 三、多进程

Unix和Linux操作系统上提供了fork()系统调用来创建进程,调用fork()函数的是父进程,创建出的是子进程,子进程是父进程的一个拷贝,但是子进程拥有自己的PID。fork()函数非常特殊它会返回两次,父进程中可以通过fork()函数的返回值得到子进程的PID,而子进程中的返回值永远都是0。Python的os模块提供了fork()函数。由于Windows系统没有fork()调用,因此要实现跨平台的多进程编程,可以使用multiprocessing模块的Process类来创建子进程,而且该模块还提供了更高级的封装,例如批量启动进程的进程池(Pool)、用于进程间通信的队列(Queue)和管道(Pipe)等。

创建进程实例:

# **************未使用进程*****************
# from time import time,sleep
# from random import randint
#
# def download_task(filename):
#     print('下载%s.pdf耗费了时间'%(filename))
#     download_sleep = randint(5,10)
#     sleep(download_sleep)
#     print('%s文件下载,耗费了%d秒时间'%(filename,download_sleep))
#
# def main():
#     start = time()
#     download_task('问案')
#     download_task('问件')
#     end = time()
#     print('下载用了%.3f秒'%(end-start))
#
# if __name__ == '__main__':
#     main()

# 运行结果
# 下载问案.pdf耗费了时间
# 问案文件下载,耗费了8秒时间
# 下载问件.pdf耗费了时间
# 问件文件下载,耗费了6秒时间
# 下载用了14.001秒

#***************使用进程****************
# from multiprocessing import Process
# from os import getpid
# from time import time,sleep
# from random import randint
#
# def download_task(filename):
#     print('现在运行的进程号为:[%s]'%getpid())  #获取进程号
#     print('下载%s.pdf开始'%(filename))
#     download_sleep = randint(5,10)
#     sleep(download_sleep)
#     print('%s文件下载,耗费了%d秒时间'%(filename,download_sleep))
#
# def main():
#     start = time()
#     p1 = Process(target=download_task,args=('问案',))  #使用进程,创建对象
#     p2 = Process(target=download_task,args=('问件',))  #使用进程,创建对象
#     p1.start()
#     p2.start()
#     p1.join()
#     p2.join()
#     end = time()
#     print('下载总用了%.3f秒'%(end-start))
#
# if __name__ == '__main__':
#     main()
# 运行结果
# 现在运行的进程号为:[31724]
# 下载问案.pdf开始
# 现在运行的进程号为:[1484]
# 下载问件.pdf开始
# 问件文件下载,耗费了5秒时间
# 问案文件下载,耗费了8秒时间
# 下载总用了8.691秒

 四、线程锁(同步锁互斥锁Mutex

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?

import  threading
from time import sleep,time

num = 100
ls = []
lock =threading.Lock()  #生成锁的对象
def Poc():
    global num  #设置公共变量
    # 使用线程锁,可以保证锁内代码串联运行,确保num-1这个动作执行完成
    #lock.acquire()   #设置线程所开始
    temp=num
    sleep(0.001)
    num = temp-1
    #print(num,end='',flush=True)
    #lock.release() # 设置线程所结束

if __name__ == '__main__':
    start  =time()
    for i in range(100):
        p = threading.Thread(target=Poc)
        p.start()
       # p.join()  #此处添加join会确保每个线程的运行完成。但是由于受到“全局解释器锁”(GIL)影响,会失去使用线程的意义。运行至串联
        ls.append(p)
    # for t in ls: #等待所有线程执行完毕
    #     t.join()  #注意此join是在上一个for循环外。确保线程中的子线程运行结束
    end =time()
    #sleep(1) #防止,主线程中的子线程运行完成。不至于次下面num打印出错,与上面join同样作用
    print('最后数字',num,flush=True)
    print('运行时长:',(end - start))

注意:这里当多个线程都对num赋值的temp进行减1动作的时候。由于使用的是多线程,有可能会出现,当多个子线程切换时,temp-1这个动作还没有完成,这样这时的num还没有发生变化。下一个线程取值num的时候,就会和这个没有处理完的temp-1的num相同,这样就会导致减1这个动作变少。最终没有得出num=0.使用锁这个功能,就是为了保证,当对多个线程修改同一份数据时,确保这个修改动作是完成的,完成后才进行下一个线程。此时锁内的线程也就是串行运行的。相对于线程内使用join,不会使得子线程的全部代码进行串行运行。缩小运行量,减少时间。(打印最终结果时,注意确保子线程都运行完毕了,不然最后的结果还是不一样的。这里使用了另一个环循加join。和另一种方法sleep,直接暂停了一段时间等待)

运行过程:

“全局解释器锁”(GIL)和Lock的区别

这里的GIL是为了保证同一时间只有一个线程在运行。Lock是确保多个线程处理同一块数据时,数据一步修改完成。

 五、线程死锁和递归锁Rlock

定义:在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。

import time,threading
alock = threading.Lock()
block = threading.Lock()
ls = []

class slock(threading.Thread):
    def A(self):
        alock.acquire()
        print('这是A1。。。',time.ctime())
        time.sleep(3)
        block.acquire()
        print('这是A2。。。',time.ctime())
        block.release()
        alock.release()
    def B(self):
        block.acquire()
        print('这是B1。。。',time.ctime())
        time.sleep(3)
        alock.acquire()
        print('这是B2。。。',time.ctime())
        alock.release()
        block.release()

    def run(self):
        self.A()
        self.B()

if __name__ == '__main__':
    for i in range(10):
        ls.append(slock())
    for i in ls:
        i.start()
    for i in ls:
        i.join()
        
# 运行结果
# 这是A1。。。 Thu May 30 16:13:10 2019
# 这是A2。。。 Thu May 30 16:13:13 2019
# 这是B1。。。 这是A1。。。Thu May 30 16:13:13 2019
#  Thu May 30 16:13:13 2019
死锁案例

解释:一开始,A1,A2正常打印出来。当进入B函数的时候,B函数最外层的是block锁,打印出B1后开始进入alock锁。但是于此同时,第二个线程也已经开始,它进入的A函数的第一个锁是alock,打印出A1,开始进入block。但是此时的block锁在第一个线程的B函数中,还未释放,准备进入B函数的第二层锁alock,而此时的alock正在被第二个线程占着,准备进入block。因此就造成了死锁的情况。不动啦!!!一直在那里等候。

为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

# import time,threading
# lock = threading.RLock()
# # block = threading.Lock()
# ls = []
# class slock(threading.Thread):
#     def A(self):
#         lock.acquire()
#         print('这是A1。。。',time.ctime())
#         time.sleep(3)
#         lock.acquire()
#         print('这是A2。。。',time.ctime())
#         lock.release()
#         lock.release()
#     def B(self):
#         lock.acquire()
#         print('这是B1。。。',time.ctime())
#         time.sleep(3)
#         lock.acquire()
#         print('这是B2。。。',time.ctime())
#         lock.release()
#         lock.release()
#
#     def run(self):
#         self.A()
#         self.B()
#
# if __name__ == '__main__':
#     for i in range(10):
#         ls.append(slock())
#     for i in ls:
#         i.start()
#     for i in ls:
#         i.join()
递归锁RLock

 六、条件变量同步

 有一类线程需要满足条件之后才能够继续执行,Python提供了threading.Condition 对象用于条件变量线程的支持,它除了能提供RLock()或Lock()的方法外,还提供了 wait()、notify()、notifyAll()方法。  lock_con=threading.Condition([Lock/Rlock]): 锁是可选选项,不传人锁,对象自动创建一个RLock()。

wait()条件不满足时调用,线程会释放锁并进入等待阻塞;
notify():条件创造后调用,通知等待池激活一个线程;
notifyAll():条件创造后调用,通知等待池激活所有线程。
import threading,time
from random import randint
Ls = []
lock = threading.Condition()
class Prod(threading.Thread):
    def run(self):
        global Ls
        while True:
            val = randint(0,100)
            print('生产',self.name,Ls.append(val),Ls)
            lock.acquire()
            lock.notify()  #条件创造后调用,通知等待池激活一个线程;
            lock.release()
            time.sleep(3)

class Cons(threading.Thread):
    def run(self):
        while True:
            global Ls
            lock.acquire()
            if len(Ls) == 0:
                lock.wait()  #条件不满足时调用,线程会释放锁并进入等待阻塞;
            print('消费:',self.name,':删除'+str(Ls[0]),Ls)
            del Ls[0]
            lock.release()
            time.sleep(0.2)

if __name__ == '__main__':
    thread = []
    for i in range(5):
        thread.append(Prod())
    thread.append(Cons())
    for i in thread:
        i.start()
    for i in thread:
        i.join()
条件变量同步案例

线程Prod运行完成后,提供条件给Cons。

七、同步条件(Event)

  条件同步和条件变量同步差不多意思,只是少了锁功能,因为条件同步设计于不访问共享资源的条件环境。event=threading.Event():条件环境对象,初始值 为False;

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。
import  threading,time
Ls = []
event = threading.Event()
class Boss(threading.Thread):
    def run(self):
        print('Boss:今天加班!!!')
        event.isSet() or event.set()  #设置进入event或者Event设置为True.把下面的内容传给
        time.sleep(2)
        print('Boss:下班啦!!!')
        time.sleep(2)
        event.isSet() or event.set() #设置进入event或者Event设置为True
class Worker(threading.Thread):
    def run(self):
        event.wait()   #Evet.wait() 等待Event事件
        print('Worker:命苦啊!!!')
        time.sleep(1)
        event.clear()  #Event事件结束 .如果不加,下一个wait也将会被执行
        event.wait()   #继续等待
        print('Wokrer:happy!!!')

if __name__ == '__main__':
    for i in range(2):
        Ls.append(Boss())
        #Ls.append(Worker())
    #Ls.append(Boss())
    Ls.append(Worker())
    for i in Ls:
        i.start()
    for i in Ls:
        i.join()
同步条件案例

八、信号量Semaphore

1、基本概念

信号量:是由操作系统管理的一种抽象数据类型,用于在多线程中同步对共享资源的使用。本质上说,信号量是一个内部数据,用于标明当前的共享资源可以有多少并发读取

 在threading中,信号量有acquirerelease两个函数。

  • 每当线程想要读取关联了信号量的共享资源时,必须调用acquire,此操作减少信号量的内部变量,如果此变量的值非负,那么分配该资源的权限。如果是负值,那么线程被挂起,直到有其他的线程释放资源。
  • 当线程不再需要该共享资源,必须通过release释放,这样,信号线的内部变量增加,在信号量等待队列中排在最前面的线程会拿到共享资源的权限。

 

信号量同步机制在线程操作必须以原子操作,才会没有问题,但如果不是原子操作,或者两个操作有一个终止了,就会出现问题,比如:

有两个并发线程,都在等待一个信号量,假设目前信号量的内部值为1,再假设线程A将信号量的值从1减到0,此时线程A拿到资源权限,这时候如果控制器切换到了线程B,线程B将信号量的值从0减到-1,并且在这里被挂起等待,这时控制器回到线程A,信号量已经成为了负值,于是第一个线程也在等待。尽管当时的信号量是可以让线程访问资源的,但是因为非原子操作导致了所有的线程都在状态。

import threading,time
sem = threading.Semaphore(3)  # 控制线程数的多少
class consumer(threading.Thread):
    def run(self):
        if sem.acquire():  # 调用
            print('调用:',self.name)
            time.sleep(2)
            sem.release()  # 释放
            print('释放:',self.name)
            time.sleep(4)
if __name__ == '__main__':
    ls = []
    for i in range(5):
        ls.append(consumer())
    for i in ls:
        i.start()
    for i in ls:
        i.join()

# 调用: Thread-1
# 调用: Thread-2
# 调用: Thread-3
# 释放: Thread-4
# 调用: Thread-1
# 释放: Thread-2
# 调用: Thread-5
#
# 释放: Thread-3
# 释放: Thread-4
# 释放: Thread-5
信号量案例

注意: 

信号量的一个特殊用法是互斥量。互斥量是初始值为1的信号量,可以实现数据、资源的互斥访问。

信号量在支持多线程的编程语言中应用很广,但是他也有可能造成死锁的情况。例如,有一个线程t1,先等待信号量s1,然后等待信号量s2,而线程t2会先等待信号量s2,然后再等待信号量s1,这样就会发生死锁,导致t1等待s2,但是t2在等待s1。

九、队列queue

创建一个“队列”对象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

将一个值放入队列中
q.put(10)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

将一个值从队列中取出
q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为
空且block为False,队列将引发Empty异常。 Python Queue模块有三种队列及构造函数:
1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize) 2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize) 3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) 此包中的常用方法(q = Queue.Queue()): q.qsize() 返回队列的大小 q.empty() 如果队列为空,返回True,反之False q.full() 如果队列满了,返回True,反之False q.full 与 maxsize 大小对应 q.get([block[, timeout]]) 获取队列,timeout等待时间 q.get_nowait() 相当q.get(False) 非阻塞 q.put(item) 写入队列,timeout等待时间 q.put_nowait(item) 相当q.put(item, False) q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号 q.join() 实际上意味着等到队列为空,再执行别的操作
import queue
q = queue.Queue(5)
q.put('1')
q.put('2')
q.put('3')
q.put('4')
q.put('5')
print(q.queue)  # deque(['1', '2', '3', '4', '5'])
print(q.qsize()) # 5
print(q.all_tasks_done)  # <Condition(<unlocked _thread.lock object at 0x000001DA2AAB7940>, 0)>
#print(q.join())
print(q.task_done())  # None

print('*****second*****')
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.full())  # False 如果队列满了,返回True,反之False
print(q.full) # <bound method Queue.full of <queue.Queue object at 0x000001DA2AC37A90>>
print(q.empty())  # 如果队列为空,返回True,反之False
print(q.queue)
print(q.qsize())
print(q.all_tasks_done)
print(q.join())  # 0 实际上意味着等到队列为空,再执行别的操作
print(q.task_done())  #在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
queue常用方法
原文地址:https://www.cnblogs.com/070727sun/p/10937536.html