【python进阶】并发编程-线程进程协程

并发编程-进程、线程、协程

为什么要有操作系统?

操作系统是一个用来协调、管理和控制计算机硬件和软件资源的系统程序,它位于硬件和应用程序之间。

     (程序是运行在系统上的具有某种功能的软件,比如说浏览器,音乐播放器等。)

操作系统的内核的定义:操作系统的内核是一个管理和控制程序,负责管理计算机的所有物理资源,其中包括:文件系
统、内存管理、设备管理和进程管理。

什么是进程(process)?

进程(process),是计算机中已运行程序的实体,是线程的容器,是最小的资源单位;一个进程至少有一个线程

进程包括三个部分:程序、数据集、进程控制块

假如有两个程序A和B,程序A在执行到一半的过程中,需要读取大量的数据输入(I/O操作),而此时CPU只能静静地等待任务A读取完数据才能继续执行,这样就白白浪费了CPU资源。是不是在程序A读取数据的过程中,让程序B去执行,当程序A读取完数据之后,让
程序B暂停,然后让程序A继续执行?当然没问题,但这里有一个关键词:切换既然是切换,那么这就涉及到了状态的保存,状态的恢复,加上程序A与程序B所需要的系统资源(内存,硬盘,键盘等等)是不一样的。自然而然的就需要有一个东西去记录程序A和程序B
分别需要什么资源,怎样去识别程序A和程序B等等,所以就有了一个叫进程的抽象进程定义:
进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系
统感知进程存在的唯一标志。
举一例说明进程:
想象一位有一手好厨艺的计算机科学家正在为他的女儿烘制生日蛋糕。他有做生日蛋糕的食谱,厨房里有所需的原料:面粉、鸡蛋、糖、香草汁等。在这个比喻中,做蛋糕的食谱就是程序(即用适当形式描述的算法)计算机科学家就是处理器(cpu),
而做蛋糕的各种原料就是输入数据。进程就是厨师阅读食谱、取来各种原料以及烘制蛋糕等一系列动作的总和。现在假设计算机科学家的儿子哭着跑了进来,说他的头被一只蜜蜂蛰了。计算机科学家就记录下他照着食谱做到哪儿了(保存进程的当前状态),然后拿出一本急救手册,按照其中的指示处理蛰伤。这里,
我们看到处理机从一个进程(做蛋糕)切换到另一个高优先级的进程(实施医疗救治),每个进程拥有各自的程序(食谱和急救手册)。当蜜蜂蛰伤处理完之后,这位计算机科学家又回来做蛋糕,从他离开时的那一步继续做下去。
进程 详细解释

什么是线程?

线程是操作系统能够进行运算调度的最小单位。线程被包含在进程之中(进程相当于一个容器),是进程中的实际运作单位。

一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

线程的出现是为了降低上下文切换的消耗,提高系统的并发性,并突破一个进程只能干一样事的缺陷,使到进程内并发成为可能。
假设,一个文本程序,需要接受键盘输入,将内容显示在屏幕上,还需要保存信息到硬盘中。若只有一个进程,势必造成同一时间只能干一样事的尴尬(当保存时,就不能通过键盘输入内容)。若有多个进程,每个进程负责一个任务,进程A负责接收键盘输入的任务,进程B负责将内容显示在屏幕上的任务,
进程C负责保存内容到硬盘中的任务。这里进程A,B,C间的协作涉及到了进程通信问题,而且有共同都需要拥有的东西-------文本内容,不停的切换造成性能上的损失。若有一种机制,可以使
任务A,B,C共享资源,这样上下文切换所需要保存和恢复的内容就少了,同时又可以减少通信所带来的性能损耗,那就好了。是的,这种机制就是线程。
线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序
计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发
性能。线程没有自己的系统资源。
线程 详细解释

进程与线程的关系和比较

1、一个程序至少有一个进程,一个进程至少有一个线程。(进程可以理解成线程的容器)

2、进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。

3、线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制4、进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。
   线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)但是它可与同属一个进程的其他的线程共享所在的进程拥有的全部资源. 
   一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行。

    threading模块

一、 线程的两种调用方式

 threading 模块建立在thread 模块之上。thread模块以低级、原始的方式来处理和控制线程,而threading 模块通过对thread进行二次封装,提供了更方便的api来处理线程。

直接调用:

import threading
import time
 
def sayhi(num): #定义每个线程要运行的函数
    print("running on number:%s" %num)
    time.sleep(3)
 
if __name__ == '__main__':
    t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
    t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
    t1.start() #启动线程
    t2.start() #启动另一个线程
 
    print(t1.getName()) #获取线程名
    print(t2.getName())

继承式调用:

'''继承式调用(必须重写run()方法)'''
import threading
from time import ctime,sleep

class MyThread(threading.Thread):
    def __init__(self,num,tm):
        threading.Thread.__init__(self)
        self.num = num
        self.tm = tm

    def run(self):
        print("running on number:%s" % self.num,ctime())
        sleep(self.tm)
        print("end running the number",ctime())

threads = []
t1 = MyThread(11,3)
t2 = MyThread(22,5)
threads.append(t1)
threads.append(t2)

if __name__ == '__main__':
    t2.setDaemon(True)
    for t in threads:
        t.start()

    # for t in threads:
    #     t.join()
    t1.join()

    print("end the main threading!")

二 、t1 = threading.Thread()  实例对象方法

t1.join() --->加入主线程 ,   t1.setDaemon() --->守护主线程(与主线程同时退出)

import threading
from time import ctime,sleep

def ListenMusic(name):
        print ("Begin listening to %s. %s" %(name,ctime()))
        sleep(3)
        print("end listening %s"%ctime())

def RecordBlog(title):
        print ("Begin recording the %s! %s" %(title,ctime()))
        sleep(5)
        print('end recording %s'%ctime())

t1 = threading.Thread(target=ListenMusic,args=("小苹果",))
t2 = threading.Thread(target=RecordBlog,args=("cnblog",))

threads = []
threads.append(t1)
threads.append(t2)

if __name__ == '__main__':
    t2.setDaemon(True)
    for t in threads:
        t.start()

    # for t in threads:
    #     t.join()
    t1.join()

    print("the mian thread has done!",ctime())

join()在子线程完成运行之前,主线程将一直被阻塞

setDaemon(True):将线程声明为守护线程,必须在 t1.start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的

join() 方法:当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行。那么当主线程完成想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出

setDaemon() :只要主线程完成了,不管子线程是否完成,都要和主线程一起退出。

ListenMusic 等待3s

RecordBlog 等待5s

当t2.setDaemon(True)时,t2 为守护进程,t1.join()  t1未完成时 主线程阻塞

其它方法:

threading.Thread() 实例化对象的方法
#
run(): 线程被cpu调度后自动执行线程对象的run方法 # start():启动线程活动。 # isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading 类模块提供的一些方法: # threading.currentThread()  : 返回当前的线程变量。 # threading.enumerate()  : 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount()  : 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

概念辨析:并行&并发

并发:是指系统具有处理多个任务(动作)的能力(注:cpu处理速度非常快 切换处理多个任务 人为感知不到)

并行:是指系统具有 同时 处理多个任务(动作)的能力

并行是并发的一个子集

概念辨析:同步&异步

当进程执行到一个I/O(等待外部数据)的时候   ---->等:同步  

                                                                            ---->不等:异步(一直等到数据接收完成,再回来处理)

异步的效率较高

三、Python的GIL (Global Interpreter Lock  全局解释器锁)

In CPython, the global interpreter lock(全局解释器锁), or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行

 

任务:IO密集型(工作中大多数遇到的情况都是IO密集型的任务)

           计算密集型

结论:

  对于IO密集型任务:Python的多线程是有意义的,可以采用多线程+协程

  对于计算密集型的任务:Python的多线程就不推荐了,Python语言就不适用了。

四、 同步锁(Lock)

 

import time
import threading

def addNum():
    global num #在每个线程中都获取这个全局变量
    #num-=1

    temp=num
    time.sleep(0.01)
    num =temp-1 #对此公共变量进行-1操作

num = 100  #设定一个共享变量
#生成100个线程
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

#主线程等待所有子线程执行完毕
for t in thread_list: 
    t.join()

print('final num:', num )

观察:time.sleep(0.1)  /0.001/0.0000001 结果分别是多少?

多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢?

我们可以通过同步锁来解决这种问题,锁之间的内容被锁定,不允许CPU切换,被锁包住的部分是串行操作,所有编程语言在这里都必须这么做。

R=threading.Lock()

####
def sub():
    global num

    R.acquire()
    temp=num-1
    time.sleep(0.1)
    num=temp
    R.release()

五、线程死锁和递归锁

 在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。下面是一个死锁的例子:

'''死锁'''
import time
import threading

class MyThread(threading.Thread):
    def actionA(self):
        lockA.acquire()
        print(self.name,'actionA-gotA',time.ctime())
        time.sleep(2)

        lockB.acquire()
        print(self.name,'actionA-gotB',time.ctime())
        time.sleep(1)

        lockB.release()
        lockA.release()

    def actionB(self):
        lockB.acquire()
        print(self.name, 'actionB-gotB', time.ctime())
        time.sleep(2)

        lockA.acquire()
        print(self.name, 'actionB-gotA', time.ctime())
        time.sleep(1)

        lockA.release()
        lockB.release()

    def run(self):
        self.actionA()
        self.actionB()

if __name__ == '__main__':
    lockA = threading.Lock()
    lockB = threading.Lock()
    thread_list = []

    for i in range(5):
        t = MyThread()  #创建5个线程
        t.start()
        thread_list.append(t)

    for t in thread_list:
        t.join()

    print("ending the main thread!")
死锁

解决办法:使用递归锁

lockA=threading.Lock()
lockB=threading.Lock() #--------------r_lock=threading.RLock()

为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。

直到一个线程所有的acquire都被release,其他的线程才能获得资源。

'''解决死锁办法 ---> 递归锁'''
import time
import threading

class MyThread(threading.Thread):
    def actionA(self):
        # lockA.acquire()
        r_lock.acquire()    #锁数量count=1
        print(self.name,'actionA-gotA',time.ctime())
        time.sleep(2)

        # lockB.acquire()
        r_lock.acquire()    #count=2
        print(self.name,'actionA-gotB',time.ctime())
        time.sleep(1)

        # lockB.release()
        # lockA.release()
        r_lock.release()    #锁数量count=1
        r_lock.release()    #锁数量count=0

    def actionB(self):
        # lockB.acquire()
        r_lock.acquire()    #锁数量count=1
        print(self.name, 'actionB-gotB', time.ctime())
        time.sleep(2)

        # lockA.acquire()
        r_lock.acquire()    #锁数量count=2
        print(self.name, 'actionB-gotA', time.ctime())
        time.sleep(1)

        # lockA.release()
        # lockB.release()
        r_lock.release()    #锁数量count=1
        r_lock.release()    #锁数量count=0

    def run(self):
        self.actionA()
        self.actionB()

if __name__ == '__main__':
    # lockA = threading.Lock()
    # lockB = threading.Lock()
    r_lock = threading.RLock()
    thread_list = []

    for i in range(5):
        t = MyThread()  #创建5个线程
        t.start()
        thread_list.append(t)

    for t in thread_list:
        t.join()

    print("ending the main thread!")
递归锁

条件变量同步(Condition) ---有点麻烦不怎么用的

有一类线程需要满足条件之后才能够继续执行,Python提供了threading.Condition 对象用于条件变量线程的支持,它除了能提供RLock()或Lock()的方法外,

还提供了 wait()、notify()、notifyAll()方法。

lock_con=threading.Condition([Lock/Rlock]): 锁是可选选项,不传入锁,对象自动创建一个RLock()。

wait()      :条件不满足时调用,线程会释放锁并进入等待阻塞;
notify()    :条件创造后调用,通知等待池激活一个线程;
notifyAll()  :条件创造后调用,通知等待池激活所有线程。

实例

同步条件(Event)

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。

对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。

一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行条件同步和条件变量同步差不多意思,只是少了锁功能。

event=threading.Event():条件环境对象,初始值 为False

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

 案例:

import threading,time
class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚大家都要加班到22:00。",time.ctime())
        print(event.isSet())    #False
        event.isSet() or event.set()
        time.sleep(5)
        print("BOSS:<22:00>可以下班了。",time.ctime())
        event.isSet() or event.set()

class Worker(threading.Thread):
    def run(self):
        event.wait()    #一旦event被设定 pass
        print("Worker:哎……命苦啊!")
        time.sleep(0.25)
        event.clear()   #清除event
        event.wait()
        print("Worker:OhYeah!")

if __name__=="__main__":
    event=threading.Event()
    threads=[]
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    print("end。。。")
老板-工人案例
import threading,time
import random
def light():
    if not event.isSet():
        event.set()     #wait就不阻塞 #绿灯状态
    count = 0
    while True:
        if count < 10:
            print('33[42;1m--green light on---33[0m')
        elif count <13:
            print('33[43;1m--yellow light on---33[0m')
        elif count <20:
            if event.isSet():
                event.clear()
            print('33[41;1m--red light on---33[0m')
        else:
            count = 0
            event.set() #打开绿灯
        time.sleep(1)
        count +=1

def car(n):
    while True:
        time.sleep(random.randrange(10))
        if  event.isSet(): #绿灯
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)

if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for i in range(5):
        t = threading.Thread(target=car,args=(i,))
        t.start()
红绿灯

信号量(Semaphore) ---其实也是一把锁

 信号量用来控制线程并发数的(相当于停车位一次空出来5个 允许5辆车同时进入)

BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。

计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)

BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

实例:

import threading,time
class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(3)
            semaphore.release()

if __name__=="__main__":
    semaphore=threading.Semaphore(5)

    thread_list=[]
    for i in range(100):
        thread_list.append(myThread())
    for t in thread_list:
        t.start()
View Code

多线程利器(队列 queue)

列表是不安全的数据结构

import threading,time

li = [1, 2, 3, 4, 5]
def pri():
    while li:
        a = li[-1]
        print(a)
        time.sleep(1)
        li.remove(a)        #list.remove(x): x not in list,可能两个线程同时拿到4,线程A睡了1s 线程B删掉了列表中的4,线程A醒了 再来删除4 会报错
        
t1 = threading.Thread(target=pri, args=())
t1.start()
t2 = threading.Thread(target=pri, args=())
t2.start()
View Code

 Python的队列模块queue 有三种模式:

第一种情况:q = queue.Queue(),FIFO(first in first out)先进先出

import queue

# q = queue.Queue()      #在其他语言的数据结构中队列的默认是先进先出,这里Python将队列 堆栈 几种封装到一起了,FIFO(first in first out)
q = queue.Queue(5)      #设置队列maxsize=5

'''开始往队列中塞值'''
q.put(12)
q.put("hello")
q.put([11,22,"world"])
q.put({"name":"xiong"})
q.put((123,456,789,))
q.put('jaklfj')     #队列已满,会阻塞。
# q.put('jaklfj',block=False)     #block设为False时 队列满了再塞值会报错

while True:
    data = q.get()  #取出5个值之后 阻塞
    # data = q.get(block=False)  #block设为False时 队列空了再取值会报错
    print(data)
    print("----------------------->")

第二种情况:q = queue.LifoQueue()     Last in first out 后进先出(即先进后出)

import queue

q = queue.LifoQueue(5)
q.put(12)
q.put("hello")
q.put([11,22,"world"])
q.put({"name":"xiong"})
q.put((123,456,789,))

while True:
    data = q.get()  #取出5个值之后 阻塞
    print(data)
    print("----------------------->")

第三种情况:q = queue.PriorityQueue()   优先级队列

import queue

q = queue.PriorityQueue()
q.put([3,12])
q.put([5,"hello"])
q.put([1,[11,22,"world"]])      #优先级最高,第1个出来
q.put([2,{"name":"xiong"}])
q.put([4,(123,456,789,)])

while True:
    data = q.get()  #取出5个值之后 阻塞
    print(data)
    print('优先级为->',data[0],' 数据为->',data[1])
    print("----------------------->")

队列类实例对象的一些方法总结:

创建一个“队列”对象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

将一个值放入队列中
q.put(10)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

将一个值从队列中取出
q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

Python Queue模块有三种队列及构造函数:
1、Python Queue模块的FIFO队列先进先出。  class queue.Queue(maxsize)
2、LIFO类似于堆,即先进后出。             class queue.LifoQueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。   class queue.PriorityQueue(maxsize)

此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
  q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait()   相当q.get(block=False)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item)   相当q.put(item, block=False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作

实例:

案例1:

'''案例1'''
import threading,queue
from time import sleep
from random import randint

class Production(threading.Thread):
    def run(self):
        while True:
            r=randint(0,10)
            q.put(r)
            print("生产出来%s号包子"%r)
            sleep(5)

class Proces(threading.Thread):
    def run(self):
        while True:
            re=q.get()
            print("吃掉%s号包子。。。"%re)
            sleep(2)

if __name__=="__main__":
    q=queue.Queue(10)

    threads=[Production(),Production(),Production(),Proces()]
    for t in threads:
        t.start()
'''案例1'''

案例2:

'''案例2'''
import time, random
import queue, threading

q = queue.Queue()

def Producer(name):
    count = 1
    while count < 11:
        time.sleep(random.randrange(5))  # 0 1 2 3 4
        q.put(count)
        print('Producer %s has produced %s baozi..' % (name, count))
        count += 1

def Consumer(name):
    count = 1
    while count < 11:
        time.sleep(random.randrange(4))
        if not q.empty():
            data = q.get()
            # print(data)
            print('33[32;1mConsumer %s has eat %s baozi...33[0m' % (name, data))
            time.sleep(2)
        else:
            print("-----no baozi anymore----")
        count += 1


p1 = threading.Thread(target=Producer, args=('厨子A',))
c1 = threading.Thread(target=Consumer, args=('顾客B',))
c2 = threading.Thread(target=Consumer, args=('顾客C',))
c3 = threading.Thread(target=Consumer, args=('顾客D',))
p1.start()
c1.start()
c2.start()
c3.start()
'''案例2'''

案例3:终极版-生产者消费者模型

聊一聊什么是生产者 消费者 和线程队列的联系

为什么要使用生产者和消费者模式??

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式??

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。
'''案例3:增强版 生产者消费者模型  q.join()  q.task_done() '''
import time, random
import queue, threading

q = queue.Queue()

def Producer(name):
    count = 0
    while count < 10:
        print('making。。。。。')
        time.sleep(random.randrange(5))  # 0 1 2 3 4
        q.put(count)
        print('Producer %s has produced %s baozi..' % (name, count))
        count += 1
        q.join()  # 厨子收到顾客吃完包子的信号 又继续开始做包子
        print("ok.....")

def Consumer(name):
    count = 0
    while count < 10:
        time.sleep(random.randrange(4))
        data = q.get()
        print('eating>>>')
        time.sleep(3)
        print('33[32;1mConsumer %s has eaten the %s baozi...33[0m' % (name, data))
        q.task_done()   #顾客吃完了,给厨子一个信号

        count += 1

p1 = threading.Thread(target=Producer, args=('厨子A',))
c1 = threading.Thread(target=Consumer, args=('顾客B',))
c2 = threading.Thread(target=Consumer, args=('顾客C',))
c3 = threading.Thread(target=Consumer, args=('顾客D',))
p1.start()
c1.start()
c2.start()
c3.start()
View Code

案例4:

实现一个线程不断生成一个随机数到一个队列中(考虑使用Queue这个模块)
实现一个线程从上面的队列里面不断的取出奇数
实现另外一个线程从上面的队列里面不断取出偶数
import random, threading, time
from queue import Queue

# Producer thread
class Producer(threading.Thread):
    def __init__(self, t_name, queue):
        threading.Thread.__init__(self, name=t_name)
        self.data = queue

    def run(self):
        for i in range(10):  # 随机产生10个数字 ,可以修改为任意大小
            randomnum = random.randint(1, 99)
            print("%s: %s is producing 33[1;31;40m %d33[0m to the queue!" % (time.ctime(), self.getName(), randomnum))
            self.data.put(randomnum)  # 将数据依次存入队列
            time.sleep(1)
        print("%s: %s finished!" % (time.ctime(), self.getName()))

# Consumer thread
class Consumer_even(threading.Thread):
    def __init__(self, t_name, queue):
        threading.Thread.__init__(self, name=t_name)
        self.data = queue

    def run(self):
        while 1:
            try:
                val_even = self.data.get(1, 5)  # get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒
                if val_even % 2 == 0:
                    print("%s: %s is consuming. 33[1;32;47m %d33[0m in the queue is consumed!" % (time.ctime(), self.getName(), val_even))
                    time.sleep(2)
                else:
                    self.data.put(val_even)
                    time.sleep(2)
            except:  # 等待输入,超过5秒 就报异常
                print("%s: %s finished!" % (time.ctime(), self.getName()))
                break

class Consumer_odd(threading.Thread):
    def __init__(self, t_name, queue):
        threading.Thread.__init__(self, name=t_name)
        self.data = queue
    def run(self):
        while 1:
            try:
                val_odd = self.data.get(1, 5)
                if val_odd % 2 != 0:
                    print("%s: %s is consuming. 33[1;35;43m %d33[0m in the queue is consumed!" % (time.ctime(), self.getName(), val_odd))
                    time.sleep(2)
                else:
                    self.data.put(val_odd)
                    time.sleep(2)
            except:
                print("%s: %s finished!" % (time.ctime(), self.getName()))
                break

# Main thread
def main():
    queue = Queue()
    producer = Producer('Pro.', queue)
    consumer_even = Consumer_even('Con_even.', queue)
    consumer_odd = Consumer_odd('Con_odd.', queue)
    producer.start()
    consumer_even.start()
    consumer_odd.start()
    producer.join()
    consumer_even.join()
    consumer_odd.join()
    print('All threads terminate!')

if __name__ == '__main__':
    main()
'''案例4'''

多进程模块 multiprocessing

进程的理论基础参考:https://www.cnblogs.com/haiyan123/p/7423178.html

https://www.cnblogs.com/yuanchenqi/articles/6248025.html

  Multiprocessing is a package that supports spawning processes using an API similar to the threading module.

The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

  由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。multiprocessing包是Python中的多进程管理包。

与threading.Thread 类似,它可以利用multiprocessing.Process 对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。

此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。

所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

Process类的介绍

1.创建进程的类

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

2.参数介绍

group  参数未使用,值始终为None

target  表示调用对象,即子进程要执行的任务

args  表示调用对象的位置参数元组,args=(1,2,'egon',)

kwargs  表示调用对象的字典,kwargs={'name':'egon','age':18}

name  为进程的名称(如果自己定义类 则名称为类名-1)

3.实例方法介绍

p.start(): 启动进程,并调用该子进程中的p.run() 
p.run(): 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  

p.terminate(): 强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。
  如果p还保存了一个锁那么也将不会被释放,进而导致死锁

p.is_alive(): 如果p仍然运行,返回True p.join([timeout]): 主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间, 需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

4、属性

deamon:为守护进程和线程的setDeamon一样
name :进程名字
pid :进程号
ppid :父进程号

Process类的使用

一定要把开进程的代码写在if __name__=='__main__':下面

开一个进程和主进程是并发的关系,我start一下就是先告诉操作系统我要开一个进程
,然而它不会等待,他会去执行下面的代码,完了他吧进程开始后,就开始执行了

strat(): 方法的功能

  1.开启进程
  2.执行功能

开启进程的两种方式

from multiprocessing import Process
import time
import random
def piao(name):
    print('%s is piaoing'%name)
    time.sleep(random.randint(1,3))
    print('%s is piao end'%name)
if __name__ =='__main__':
    p1 = Process(target=piao,kwargs={'name':'alex'})
    p2 = Process(target=piao,kwargs={'name':'alex'})
    p3 = Process(target=piao,kwargs={'name':'alex'})
    p1.start()
    p2.start()
    p3.start()
    print('主进程')
第一种方式
from multiprocessing import Process
import time
import random
import os
class Piao(Process):
    def __init__(self,name):
        super().__init__() #必须继承父类的一些属性
        self.name = name
    def run(self):  #必须得实现一个run方法
        print(os.getppid(),os.getpid())
        print('%s is piaoing'%self.name)
        time.sleep(random.randint(1,3))
        print('%s is piao end'%self.name)
if __name__ =='__main__':
    p1 = Piao('alex')
    p2 = Piao('wupeiqi')
    p3 = Piao('yuanhao')
    p1.start()
    p2.start()
    p3.start()
    print('主进程',os.getpid())
第二种方式

getppid()  #父进程id
getpid()   #当前进程id

多进程实现套接字并发

"""多进程实现套接字并发-服务端"""
from socket import *
from multiprocessing import Process

s = socket(AF_INET, SOCK_STREAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind(('127.0.0.1', 8081))
s.listen(5)
print('start running...')

def talk(coon, addr):
    while True:
        try:
            data = coon.recv(1024)
            print('收到客户端消息>>>',data.decode("utf8"))
            if not data: break
            coon.send(data.upper())
        except Exception:
            break
    coon.close()

if __name__ == '__main__':
    while True:
        coon, addr = s.accept()
        print(coon, addr)
        p = Process(target=talk, args=(coon, addr))
        print(p.name)
        p.start()

    s.close()
服务端
from socket import *
c = socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8081))

while True:
    cmd = input('>>:').strip()
    if not cmd:continue
    info = "客户端[1]发来消息--->"+cmd
    c.send(info.encode('utf-8'))
    data = c.recv(1024)
    print('收到服务端回复>>>',data.decode('utf-8'))

c.close()
客户端1
from socket import *
c = socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8081))

while True:
    cmd = input('>>:').strip()
    if not cmd:continue
    info = "客户端[2]发来消息--->"+cmd
    c.send(info.encode('utf-8'))
    data = c.recv(1024)
    print('收到服务端回复>>>',data.decode('utf-8'))

c.close()
客户端2

进程间通讯

1、进程间的通信应该尽量避免共享数据的方式

2、进程间的数据是独立的,可以借助队列或管道实现通信,二者都是基于消息传递的。

虽然进程间数据独立,但可以用 Manager实现数据共享,事实上Manager的功能远不止于此。

进程队列Queue

from multiprocessing import Process, Queue
import queue

def foo(q,n):
    #q.put([123, 456, 'hello'])
    q.put(n*n+1)
    print("son process",id(q))

if __name__ == '__main__':
    q = Queue()  #try: q=queue.Queue()
    print("main process",id(q))

    for i in range(3):
        p = Process(target=foo, args=(q,i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())
View Code

管道

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example: 

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([12, {"name":"yuan"}, 'hello'])
    response=conn.recv()
    print("response",response)
    conn.close()
    print("q_ID2:",id(conn))


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()    #双向管道,可发 可收
    print('主进程--->',parent_conn)
    print("q_ID1:",id(child_conn))      #主进程调用,与子进程中q_ID2 不一样
    p = Process(target=f, args=(child_conn,))
    p.start()

    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    parent_conn.send("儿子你好!")
    p.join()
View Code

Pipe()返回的两个连接对象代表管道的两端。 每个连接对象都有send()和recv()方法(以及其他方法)。

请注意,如果两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏。 当然,同时使用管道的不同端的进程不存在损坏的风险。

Managers --->实现进程间数据共享

Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。

manager 返回的 Manager()对象 控制一个服务器进程,该进程保存Python对象并允许其他进程使用代理操作它们。

manager 返回的 Manager()对象 将支持类型有  list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value和Array。 例如:

from multiprocessing import Process, Manager,current_process

def f(d, l,n):
    d[n] = '1'      #第一个子进程处理之后:{0,"1"}
    d['2'] = 2      #第一个子进程处理之后:{0:"1",'2':2}
    l.append(n)     #第一个子进程处理之后:[0,1,2,3,4,0]
    #print(l)
    print(current_process())    #当前的进程
    print("son process:",id(d),id(l))

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()  #创建manager字典对象 {}

        l = manager.list(range(5))  #[0,1,2,3,4]

        print("main process:",id(d),id(l))

        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d,l,i))
            p.start()
            p_list.append(p)

        for res in p_list:
            res.join()

        print(d)        #子进程 修改了字典内容,主进程可以直接查看到,即实现了进程间数据共享
        print(l)
View Code

进程同步

不使用来自不同进程的锁 输出很容易混淆

# 屏幕输出 不使用锁 会混乱,Python2版本更明显
from multiprocessing import Process, Lock

def f(l, i):
    # l.acquire()
    print('hello world %s' % i)
    # l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        p = Process(target=f, args=(lock, num))
        p.start()
View Code

Python并发编程之进程池,线程池,协程

 参考:https://www.cnblogs.com/haiyan123/p/7461294.html

需要注意一下
不能无限的开进程,不能无限的开线程
最常用的就是开进程池,开线程池。其中回调函数非常重要
回调函数其实可以作为一种编程思想,谁好了谁就去掉

只要你用并发,就会有锁的问题,但是你不能一直去自己加锁吧
那么我们就用QUEUE,这样还解决了自动加锁的问题
由Queue延伸出的一个点也非常重要的概念。以后写程序也会用到
这个思想。就是生产者与消费者问题

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:

  • 很明显需要并发执行的任务通常要远大于核数
  • 一个操作系统不可能无限开启进程,通常有几个核就开几个进程
  • 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)

例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

那么什么是进程池呢?进程池就是控制进程数目

ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。 

进程池的结构:

 创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程

1、创建进程池

Pool([numprocess  [,initializer [, initargs]]])  :创建进程池

2、参数介绍

numprocess:要创建的进程数,如果省略,将默认为cpu_count()的值,可os.cpu_count()查看

initializer:是每个工作进程启动时要执行的可调用对象,默认为None
initargs:是要传给initializer的参数组

3、方法介绍

p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。
如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
  
p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,
callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
    
p.close():关闭进程池,防止进一步操作。禁止往进程池内在添加任务(需要注意的是一定要写在close()的上方)
P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

4、应用1:

'''apply同步进程池(阻塞)(串行)'''
from multiprocessing import Pool
import os,time

def task(n):
    print('[%s] is running'%os.getpid())
    time.sleep(2)
    print('[%s] is done'%os.getpid())
    return n**2

if __name__ == '__main__':
    # print(os.cpu_count())  #查看cpu个数
    p = Pool(4)     #最大四个进程
    print("》》》",os.getpid())
    for i in range(1,7):    #开6个任务
        res = p.apply(task,args=(i,))  #同步的,等着一个运行完才执行另一个
        print('本次任务的结束:%s'%res)

    p.close()   #禁止往进程池内在添加任务
    p.join()    #在等进程池
    print('end main process...')
apply同步进程池(阻塞)(串行)
"""apply_async异步进程池(非阻塞)(并行)"""
# ----------------
# 那么我们为什么要用进程池呢?这是因为进程池使用来控制进程数目的,
# 我们需要几个就开几个进程。如果不用进程池实现并发的话,会开很多的进程
# 如果你开的进程特别多,那么你的机器就会很卡,所以我们把进程控制好,用几个就开几个,也不会太占用内存
from multiprocessing import Pool
import os,time

def walk(n):
    print('[%s] is running' % os.getpid())
    time.sleep(2)
    print('[%s] is done' % os.getpid())
    return n ** 2

if __name__ == '__main__':
     t1 = time.time()
     p = Pool(4)
     res_obj_l = []
     for i in range(10):
         res = p.apply_async(walk,args=(i,))
         # print(res)  #打印出来的是对象
         res_obj_l.append(res)  #那么现在拿到的是一个列表,怎么得到值呢?我们用个.get方法

     p.close() #禁止往进程池里添加任务
     p.join()
     # print(res_obj_l)
     print([obj.get() for obj in res_obj_l])  #这样就得到了
     print('end main process...')
     t2 = time.time()
     print(t2-t1)
apply_async异步进程池(非阻塞)(并行)

   

那么什么是同步,什么是异步呢?

同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去

异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态。当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率。

什么是串行,什么是并行呢?

举例:能并排开几辆车的就可以说是“并行”,只能一辆一辆开的就属于“串行”了。很明显,并行的速度要比串行的快得多。(并行互不影响,串行的等着一个完了才能接着另一个)

5、应用2:

使用进程池维护固定数目的进程(以前客户端和服务端的改进)

from socket import *
from multiprocessing import Pool
s = socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #端口重用
s.bind(('127.0.0.1',8081))
s.listen(5)
print('start running...')

def talk(coon,addr):
    while True:  # 通信循环
        try:
            cmd = coon.recv(1024)
            print('客户端[%s]消息:'%coon ,cmd.decode('utf-8'))
            if not cmd: break
            coon.send(cmd.upper())
            print('发送的是%s'%cmd.upper().decode('utf-8'))
        except Exception:
            break
    coon.close()

if __name__ == '__main__':
    p = Pool(4)
    while True:#链接循环
        coon,addr = s.accept()
        print(coon,"
",addr)
        p.apply_async(talk,args=(coon,addr))
    s.close()
#因为是循环,所以就不用p.join了
服务端
from socket import *
c = socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8081))

while True:
    cmd = input('client1>>:').strip()
    if not cmd:continue
    c.send(cmd.encode('utf-8'))
    data = c.recv(1024)
    print('接受server的是%s'%data.decode('utf-8'))

c.close()
客户端

回调函数:

链接:https://www.cnblogs.com/XJT2018/p/10938386.html

Python中的上下文管理器(contextlib模块)

链接:

协程

协程:单线程下实现并发(提高效率)。协程又称微线程,纤程。英文名Coroutine。

说到协成,我们先说一下协程联想到的知识点

切换关键的一点是:保存状态(从原来停留的地方继续切)

return:只能执行一次,结束函数的标志
yield:函数中但凡有yield,这个函数的执行结果就变成了一个生成器,
生成器本质就是一个迭代器,那么迭代器怎么用呢?用一个next()方法
 
1.yield语句的形式:yield 1
yield功能1:可以用来返回值,可以返回多次值
yield功能2:可以吧函数暂停住,保存原来的状态
 
2.yield表达式的形式:x = yieldsend可以吧一个函数的结果传给另一个函数,以此实现单线程内程序之间的切换
send()要想用就得先next()一下
但是要用send至少要用两个yield  

参考yield模块专题:

优点1: 协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

优点2: 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

yield的简单实现

import time
import queue

def consumer(name):
    print("--->ready to eat baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name,new_baozi))
        # time.sleep(1)

def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while 1:
        time.sleep(3)
        print("33[32;1m[producer]33[0m is making baozi %s and %s" %(n,n+1) )
        con.send(n)
        con2.send(n+1)

        n +=2

if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    # print(con)
    # print(con2)
    producer()
View Code

Greenlet

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()


def test2():
    print(56)
    gr1.switch()
    print(78)


gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
View Code

参考:http://www.imooc.com/article/68942

https://www.cnblogs.com/yuanchenqi/articles/5733873.html

https://www.cnblogs.com/yuanchenqi/articles/6248025.html

原文地址:https://www.cnblogs.com/XJT2018/p/10154383.html