开始说并发编程之前,最好有一定的底层知识积累,这里我把需要的知识总结了一下,如果看下面的有不理解的可以看一下:https://www.cnblogs.com/kuxingseng95/p/9418203.html
引子
- 计算机的核心是CPU,它承担了所有的计算任务。它就像一座工厂,时刻在运行。
- 假定工厂的电力有限,一次只能供给一个车间使用。也就是说,一个车间开工的时候,其他车间都必须停工。背后的含义就是,单个CPU一次只能运行一个任务。
- 进程就好比工厂的车间,它代表CPU所能处理的单个任务。任一时刻,CPU总是运行一个进程,其他进程处于非运行状态。
- 一个车间里,可以有很多工人。他们协同完成一个任务。
- 线程就好比车间里的工人。一个进程可以包括多个线程。
- 车间的空间是工人们共享的,比如许多房间是每个工人都可以进出的。这象征一个进程的内存空间是共享的,每个线程都可以使用这些共享内存。
- 可是,每间房间的大小不同,有些房间最多只能容纳一个人,比如厕所。里面有人的时候,其他人就不能进去了。这代表一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。
- 一个防止他人进入的简单方法,就是门口加一把锁。先到的人锁上门,后到的人看到上锁,就在门口排队,等锁打开再进去。这就叫"互斥锁"(Mutual exclusion,缩写 Mutex),防止多个线程同时读写某一块内存区域。
- 还有些房间,可以同时容纳n个人,比如厨房。也就是说,如果人数大于n,多出来的人只能在外面等着。这好比某些内存区域,只能供给固定数目的线程使用。
- 这时的解决方法,就是在门口挂n把钥匙。进去的人就取一把钥匙,出来时再把钥匙挂回原处。后到的人发现钥匙架空了,就知道必须在门口排队等着了。这种做法叫做"信号量"(Semaphore),用来保证多个线程不会互相冲突。
不难看出,互斥锁是信号量的一种特殊情况(n=1时)。也就是说,完全可以用后者替代前者。但是,因为互斥锁较为简单,且效率高,所以在必须保证资源独占的情况下,还是采用这种设计。
上面的内容转载自:http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html
看了上面简单的,再说一下复杂的
进程
进程(英语:process),是计算机中已运行程序的实体。进程为曾经是分时系统的基本运作单位。在面向进程设计的系统(如早期的UNIX,Linux 2.4及更早的版本)中,进程是程序的基本执行实体;在面向线程设计的系统(如当代多数操作系统、Linux 2.6及更新的版本)中,进程本身不是基本运行单位,而是线程的容器。程序本身只是指令、数据及其组织形式的描述,进程才是程序(那些指令和数据)的真正运行实例。若干进程有可能与同一个程序相关系,且每个进程皆可以同步(循序)或异步(平行)的方式独立运行。现代计算机系统可在同一段时间内以进程的形式将多个程序加载到存储器中,并借由时间共享(或称时分复用),以在一个处理器上表现出同时(平行性)运行的感觉。同样的,使用多线程技术(多线程即每一个线程都代表一个进程内的一个独立执行上下文)的操作系统或计算机体系结构,同样程序的平行线程,可在多CPU主机或网络上真正同时运行(在不同的CPU上)。
关于进程在操作系统中的状态及调度流程如下图
简单图:
复杂图:
线程
线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程。
线程是独立调度和分派的基本单位。线程可以为操作系统内核调度的内核线程,如Win32线程;由用户进程自行调度的用户线程,如Linux平台的POSIX Thread;或者由内核与用户进程,如Windows 7的线程,进行混合调度。
同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage)。
一个进程可以有很多线程,每条线程并行执行不同的任务。
在多核或多CPU,或支持Hyper-threading的CPU上使用多线程程序设计的好处是显而易见,即提高了程序的执行吞吐率。在单CPU单核的计算机上,使用多线程技术,也可以把进程中负责I/O处理、人机交互而常被阻塞的部分与密集计算的部分分开来执行,编写专门的workhorse线程执行密集计算,从而提高了程序的执行效率。
总结:
- 一个程序至少有一个进程,一个进程至少有一个线程.(进程可以理解成线程的容器)
- 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。
- 线程是最小的执行单元,进程是最小的资源单位。
- 进程本质上就是一段程序的运行过程。
-
线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
-
进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位. 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源. 一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行。
- 进程间的切换比线程间的切换要耗时的多得多。
python中的GIL
关于python的GIL锁简单说就是:无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行。
由于GIL的存在,python中多线程其实并不是真正的多线程,如果想要充分发挥CPU的资源,在python中大部分情况需要使用多进程。而后面又经过优化,出现了多线程加协程。
在开发中,有两种比较常见的处理情况,一个是IO密集型,一个是计算密集型,由于GIL锁的存在,所以python还是比较适用于IO操作,因为这样能发挥多线程的能力,而要实现计算密集型就需要开多进程,才能够真正的发挥计算机多核的能力。
这里转一下别人的文章,感觉写的很全面,不多说明:http://python.jobbole.com/87743/
多线程
在python中使用的是threading模块,它建立在thread 模块之上。thread模块以低级、原始的方式来处理和控制线程,而threading 模块通过对thread进行二次封装,提供了更方便的api来处理线程。
调用方式
直接调用
import threading import time def sayhi(num): #定义每个线程要运行的函数 print("running on number:%s" %num) time.sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例 t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例 t1.start() #启动线程,可以认为是让进程进入上面进程图中的就绪状态 t2.start() #启动另一个线程 print(t1.getName()) #获取线程名 print(t2.getName())
注意,函数传入的时候要传入函数对象,也就是不加括号的形式。
继承式调用
import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self): #继承式调用必须要重写这个函数 print("running on number:%s" %self.num) # self.name是这个线程的名字 time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
线程常用方法
实例对象方法
- join()
- 在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
- setDaemon(Ture)
-
将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。
当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成
想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程
完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦
-
- run()
- 线程被CPU调度后自动执行线程对象的run方法
- start()
- 启动线程对象,可以认为让其进入就绪状态
- isAlive()
- 返回线程是否活动的
- getName()
- 返回线程名
- setName()
- 设置线程名
threading模块提供的方法
- threading.currentThread():
- 返回当前的线程变量。
- threading.enumerate():
- 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
- threading.activeCount():
- 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
互斥锁
首先看看下面的代码
import time import threading NUM = 100 # 设定一个共享变量 thread_list = [] def lessNum(): global NUM # 在每个线程中都获取这个全局变量 temp = NUM time.sleep(0.1) NUM = temp - 1 # 对此公共变量进行-1操作 if __name__ == '__main__': for i in range(100): t = threading.Thread(target=lessNum) t.start() thread_list.append(t) for t in thread_list: # 等待所有线程执行完毕 t.join() print('final num:', NUM)
观察:time.sleep(0.1) /0.001/0.0000001 结果分别是多少?
按照我们的预想,应该是100个线程每次减1,然后结果是0,但是实际上不是这样,
当让函数睡0.1秒后,我的电脑的NUM的结果是99,
当让函数睡0.001秒后我的电脑的NUM的结果是90左右,
当让函数睡0.001秒后我的电脑的NUM的结果是也是90左右,
当然了,不同电脑因为运算速度不一样,可能会产生不同的结果。但是无论怎样也不是我们预想的结果0,是因为,当线程开启后,这100个线程操作的是同一个资源,当让函数睡0.1秒后,按照现在大部分电脑的运行速度,足够所有的线程获得那个NUM,而那个NUM的值都是100,减一之后,相当于给这个NUM赋值了100个100-1的结果,自然就是99,后面的0.001,和0.0000001结果也大致是这样的流程,只不过随着时间的减少,线程被cpu调用时需要一定的时间,所以一些线程处理的是其他线程处理之后的结果。
由此可以看出多个线程都在同时操作同一个共享资源,可能造成了资源破坏,怎么办呢?(join会造成串行,失去所线程的意义)
我们可以通过同步锁来解决这种问题,我们将核心的处理共享数据的地方用锁锁住,就像引子中写的那样,同一时间内只让一个线程访问这个资源。
互斥锁的格式为:
lock = threading.Lock()
lock.acquire()
要锁的内容
lock.release()
上面例子中加锁之后的样子
import time import threading NUM = 100 # 设定一个共享变量 thread_list = [] def lessNum(): global NUM # 在每个线程中都获取这个全局变量 lock.acquire() temp = NUM time.sleep(0.0000001) NUM = temp - 1 # 对此公共变量进行-1操作 lock.release() if __name__ == '__main__': lock = threading.Lock() for i in range(100): t = threading.Thread(target=lessNum) t.start() thread_list.append(t) for t in thread_list: # 等待所有线程执行完毕 t.join() print('final num:', NUM)
可以看出来,这样修改了之后就像是串行的了,而这个和python没有关系,其他的编程语言也是这样解决的。那么这样,多线程还有意义吗?答案是肯定的,因为只有这一部分需要加锁,其他部分则不需要。
死锁与递归锁
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。下面是一个死锁的例子:
import threading, time class myThread(threading.Thread): def doA(self): lockA.acquire() print(self.name, "gotlockA", time.ctime()) time.sleep(3) lockB.acquire() print(self.name, "gotlockB", time.ctime()) lockB.release() lockA.release() def doB(self): lockB.acquire() print(self.name, "gotlockB", time.ctime()) time.sleep(2) lockA.acquire() print(self.name, "gotlockA", time.ctime()) lockA.release() lockB.release() def run(self): self.doA() self.doB() if __name__ == "__main__": lockA = threading.Lock() lockB = threading.Lock() threads = [] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()
结果为
可以看到程序陷入了阻塞,其实是发生了死锁。而有一种常用的解决方式,就是使用递归锁。
递归锁的格式:
r_lock = threading.RLock() r_lock = r_lock.acquire()
要锁的内容 r_lock = r_lock.release()
为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。也就是说每调用一次acquire方法就加1,每调用一次release方法就减1,没人用的时候是默认的0。
上面例子中加锁之后的样子
import threading, time class myThread(threading.Thread): def doA(self): r_lock.acquire() print(self.name, "gotlockA", time.ctime()) time.sleep(3) r_lock.acquire() print(self.name, "gotlockB", time.ctime()) r_lock.release() r_lock.release() def doB(self): r_lock.acquire() print(self.name, "gotlockB", time.ctime()) time.sleep(2) r_lock.acquire() print(self.name, "gotlockA", time.ctime()) r_lock.release() r_lock.release() def run(self): self.doA() self.doB() if __name__ == "__main__": r_lock = threading.RLock() threads = [] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()
同步
同步和异步在开始给的网址中已经说过,这里为了方便起见,还是在进行说明
同步和异步通常用来形容一次方法调用。
- 同步方法调用一旦开始,调用者必须等到方法调用返回后,才能继续后续的行为。
- 异步方法调用更像一个消息传递,一旦开始,方法调用就会立即返回,调用者就可以继续后续的操作。而,异步方法通常会在另外一个线程中,“真实”地执行着。整个过程,不会阻碍调用者的工作。
这里实现同步就是对线程进行阻塞,等待另一个线程将数据处理结束然后解除这个阻塞状态。
创建同步对象:
event = threading.Event()
同步对象的常用方法:
event.wait():等待flag被设定,一旦event被设定,等同于pass
event.set():设定flag
event.clear():清除flag
event.isSet(): 查看当前flag状态
注:一个event可以用在多个线程中。
例子
import threading, time class Boss(threading.Thread): def run(self): print("BOSS:今晚大家都要加班到22:00。") print(event.isSet()) # False event.set() time.sleep(5) print("BOSS:<22:00>可以下班了。") print(event.isSet()) event.set() class Worker(threading.Thread): def run(self): event.wait() # 一旦event被设定,等同于pass print("Worker:哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("Worker:OhYeah!") if __name__ == "__main__": event = threading.Event() threads = [] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join() print("ending.....")
信号量
信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。
例子:
import threading, time class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(2) semaphore.release() if __name__ == "__main__": semaphore = threading.Semaphore(5) thrs = [] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()
这个信号量默认是1。
不难看出,互斥锁是信号量的一种特殊情况(n=1时)。也就是说,完全可以用后者替代前者。但是,因为互斥锁较为简单,且效率高,所以在必须保证资源独占的情况下,还是采用这种设计。
队列
Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递
创建一个队列对象
import Queue
q = Queue.Queue(maxsize = 10) Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。
python Queue模块有三种队列及构造函数
- Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
- LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)
- 还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)
常用的方法
- q.put(item[, block[, timeout]])
- 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为True。如果队列当前为空且block为True,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为False,put方法将引发Full异常。
- timeout为等待时间
q.get([block[, timeout]])
- 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
- timeout为等待时间
-
q.get_nowait()方法
-
相当Queue.get(False),这种方法在向一个空队列取值的时候会抛一个Empty异常,所以更常用的方法是先判断一个队列是否为空,如果不为空则取值
-
- q.put_nowait(item)
- 相当Queue.put(item, False)
- q.qsize()
- 返回队列的大小
- q.empty()
- 如果队列为空,返回True,反之False
- q.full()
- 如果队列满了,返回True,反之False
- q.full 与 maxsize 大小对应
- q.task_done()
- 消费者线程从队列中get到任务后,任务处理完成,当所有的队列中的任务处理完成后,会使调用queue.join()的线程返回,表示队列中任务以处理完毕。
- 如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
- q.join()
- 阻塞调用线程,直到队列中的所有任务被处理掉。
- 只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
- 实际上意味着等到队列为空,再执行别的操作
除了按照先进先出,还有一个按照优先级的处理顺序
q=queue.PriorityQueue() q.put([5,100]) # 优先级为5,放入参数为100 q.put([7,200]) # 优先级为7,放入参数为200 q.put([3,"hello"]) q.put([4,{"name":"alex"}])
生产者和消费者模型
关于队列,常常跟这个生产者和消费者的模型关联起来,因为两者的契合度很高。所以这里也提一下。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。
下面用队列的内容写一个生产者和消费者的例子
import time, random import queue, threading q = queue.Queue() def Producer(name): count = 0 while count < 10: print("making........") time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' % (name, count)) count += 1 # q.task_done() # q.join() print("ok......") def Consumer(name): count = 0 while count < 10: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() # q.task_done() # q.join() print(data) print('