python进程、线程、协程

每次执行程序(比如说浏览器,音乐播放器)的时候都会完成一定的功能,比如说浏览器帮我们打开网页。 进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。进程的创建、撤销和切换的开销比较大


线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,线程的引入减小了程序并发执行时的开销。线程没有自己的系统资源,只拥有在运行时必不可少的资源。但线程可以与同属与同一进程的其他线程共享进程所拥有的其他资源。线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。

协程,又称微线程,线程是系统级别的它们由操作系统调度,而协程则是程序级别的由程序根据需要自己调度。在一个线程中会有很多函数,我们把这些函数称为子程序,在子程序执行过程中可以中断去执行别的子程序,而别的子程序也可以中断回来继续执行之前的子程序,这个过程就称为协程。也就是说在同一线程内一段代码在执行过程中会中断然后跳转执行别的代码,接着在之前中断的地方继续开始执行,类似与yield操作。协程是一中多任务实现方式,它不需要多个进程或线程就可以实现多任务。

multiprocessing是python的多进程管理包。

threading 模块建立在 _thread 模块之上。_thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。

greenlet、gevent(第三方模块)可以实现协程

python 进程

进程

程序是指令和数据的有序集合,其本身没有任何运行的含义,是一个静态的概念。

进程是一个“执行中的程序”,进程的实质是程序的一次执行过程,进程是动态产生,动态消亡的。进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位;进程由程序、数据和进程控制块三部分组成。由于进程间的相互制约,使进程具有执行的间断性,即进程按各自独立的、不可预知的速度向前推进

 

进程调度

先来先服务(FCFS)调度算法是一种最简单的调度算法,该算法既可以作业调度,也可以作用域进程调度。FCFS算法比较有利于长作业(进程),而不利于短作业(进程)。由此可知,本算法适合于CPU繁忙型作业,而不利于I/O繁忙型作业(进程)。

短作业(进程)优先调度算法(SJ/PF)是指对短作业或者短进程优先调度的算法,该算法既可以用于作业调度,也可用于进程调度。但其对长作业不利;不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。

时间片轮转(Round Robin,RR)将CPU的处理时间分成固定大小的时间片,如果一个进程在被调度选中之后用完了系统规定的时间片,但又未完成要求的任务,则它自行释放自己所占有的CPU而排到就绪队列的末尾,等待下一次调度。同时,进程调度程序又去调度当前就绪队列中的第一个进程。

进程的并行与并发

并行:并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核CPU)

并发:并行是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A,交替使用,目的是提高效率。

区别:

并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。

并发是从宏观上,在一个时间段上可以看出是同时执行,比如一个服务器同时处理多个session。

同步/异步

同步:所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。

异步:所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列

复制代码
比如我去银行办理业务,可能会有两种方式:
第一种 :选择排队等候;
第二种 :选择取一个小纸条上面有我的号码,等到排到我这一号时由柜台的人通知我轮到我去办理业务了;

第一种:前者(排队等候)就是同步等待消息通知,也就是我要一直在等待银行办理业务情况;
第二种:后者(等待别人通知)就是异步等待消息通知。在异步消息处理中,等待消息通知者(在这个例子中就是等待办理业务的人)往往注册一个回调机制,
在所等待的事件被触发时由触发机制(在这里是柜台的人)通过某种机制(在这里是写在小纸条上的号码,喊号)找到等待该事件的人。
复制代码

阻塞/非阻塞 

阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的

复制代码
不论是排队还是使用号码等待通知,如果在这个等待的过程中,等待者除了等待消息通知之外不能做其它的事情,那么该机制就是阻塞的,表现在程序中,也就是该程序一直阻塞在
该函数调用处不能继续往下执行。相反,有的人喜欢在银行办理这些业务的时候一边打打电话发发短信一边等待,这样的状态就是非阻塞的,因为他(等待者)没有阻塞在这个消息
通知上,而是一边做自己的事情一边等待。 注意:同步非阻塞形式实际上是效率低下的,想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有。如果把打电话和观察排队的位置看成是程序的两个操作的话,这个
程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的;而异步非阻塞形式却没有这样的问题,因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)
的事情,程序没有在两种不同的操作中来回切换。
复制代码

同步/异步与阻塞/非阻塞

(1)同步阻塞形式

  效率最低。拿上面的举例来说,就是你专心排队,什么别的事都不做

(2)异步阻塞形式

  如果在银行等待办理业务的人采用的是异步的方式去等待消息被触发(通知),也就是领了一张小纸条,假如在这段时间里他不能离开银行去做其它的事情,那么很显然,这个人被阻塞在了这个等待的操作上面;异步操作也可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知书时被阻塞。

(3)同步非阻塞形式

  实际上是效率低下的。想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有,如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的。

(4)异步非阻塞形式

  效率更高,因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换。比如说,这个人突然发觉自己烟瘾犯了,需要出去抽根烟,于是他告诉大堂经理说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有被阻塞在这个等待的操作上面,自然这个就是异步+非阻塞的方式了。

很多人会把同步和阻塞混淆,是因为很多时候同步操作会以阻塞的形式表现出来

很多人也会把异步和非阻塞混淆,因为异步操作一般都不会在真正的IO操作处被阻塞

1、multiprocessing模块

python中的多线程无法利用多核优势,如果想要充分的使用CPU资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python中提供了multiprocess模块。multiprocess中几乎包含了和进程有关的所有子模块。大致分为四个部分:创建进程部分、进程同步部分、进程池部分、进程之间数据共享。multiprocessing常用组件及功能:
1.1、管理进程模块:

  • Process(用于创建进程模块)
  • Pool(用于创建管理进程池)
  • Queue(用于进程通信,资源共享)
  • Value,Array(用于进程通信,资源共享)
  • Pipe(用于管道通信)
  • Manager(用于资源共享)

1.2、同步子进程模块:

  • Condition
  • Event
  • Lock
  • RLock
  • Semaphore

2、Array,Value---共享数据

如果你真有需要共享数据, multiprocessing提供了两种方式。

(1)multiprocessing,Array,Value

数据可以用Value或Array存储在一个共享内存地图里,如下:

from multiprocessing importArray,Value,Process
 
def func(a,b):
    a.value = 3.333333333333333
    for i in range(len(b)):
        b[i] = -b[i]
 
 
if __name__ == "__main__":
    num = Value('d',0.0)
    arr = Array('i',range(11))
 
 
    c = Process(target=func,args=(num,arr))
    d= Process(target=func,args=(num,arr))
    c.start()
    d.start()
    c.join()
    d.join()
 
    print(num.value)
    for i in arr:
        print(i)<br>
#输出:
#3.1415927
#[0, -1, -2,-3, -4, -5, -6, -7, -8, -9]

创建num和arr时,“d”和“i”参数由Array模块使用的typecodes创建:“d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。

Array(‘i’, range(10))中的‘i’参数:

‘c’: ctypes.c_char     ‘u’: ctypes.c_wchar    ‘b’: ctypes.c_byte     ‘B’: ctypes.c_ubyte
‘h’: ctypes.c_short     ‘H’: ctypes.c_ushort    ‘i’: ctypes.c_int      ‘I’: ctypes.c_uint
‘l’: ctypes.c_long,    ‘L’: ctypes.c_ulong    ‘f’: ctypes.c_float    ‘d’: ctypes.c_double

(2)Manager

由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持

from multiprocessing importProcess,Manager
def f(d,l):
    d["name"] = "zhangyanlin"
    d["age"] = 18
    d["Job"] = "pythoner"
    l.reverse()
 
if __name__ == "__main__":
    with Manager() as man:
        d = man.dict()
        l = man.list(range(10))
 
        p = Process(target=f,args=(d,l))
        p.start()
        p.join()
 
        print(d)
        print(l)
#输出:
#{0.25: None, 1: '1', '2': 2}
#[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

3、multiprocess.Process模块介绍

Process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。

复制代码
Process([group [, target [, name [, args [, kwargs]]]]])

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组,args=(1,2,'egon',)
4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
5 name为子进程的名称
复制代码

方法介绍:

复制代码
1 p.start():启动进程,并调用该子进程中的p.run() 
2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
4 p.is_alive():如果p仍然运行,返回True
5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程  
复制代码

属性介绍:

复制代码
 
1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程。设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
2 p.name:进程的名称
3 p.pid:进程的pid
4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性(了解即可)
在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),因此用process()直接创建子进程会无限递归创建子。所以必须把创建子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候  ,就不会递归运行了。
复制代码

Process模块创建并开启子进程的两种方式

复制代码
#方法一;
import os
from multiprocessing import Process

def func1(name):
    print('hello', name)
    print("我是子进程: %d;我的父进程id是:%d" % (os.getpid(), os.getppid()))

def func2():
    print('hello')

if __name__ == '__main__': 
  p1 = Process(target=func1, args=('xiaobai',)) # 此处传参必须是元组数据类型 
  p1.start() 
      print("我是父进程:%d" % os.getpid()) 

      p2 = Process(target=func2)
      p2.start() 

''' 
# 执行结果 
我是父进程:12612 
hello xiaobai 
我是子进程: 5760;
我的父进程id是:12612
 '''  
# 方法二:# 通过继承Process类的形式开启进程的方式 import os from multiprocessing import Process class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): #固定名字run !!! print(os.getpid()) print('%s 正在和女神聊天' % self.name) if __name__ == '__main__': p1 = MyProcess('xiaobai') p2 = MyProcess('xiaohei') p1.start() # start会自动调用run方法 p2.start() # 说明:如果需要传参,必须写入到__init__方法里面,且必须加上super().__init__();因为父类Process里面也有__init__方法。
复制代码

Process对象的join方法

复制代码
import time
from multiprocessing import Process

def func(name):
    print("hello", name)
    time.sleep(1)
    print('我是子进程')


if __name__ == '__main__':
    p = Process(target=func, args=('xiaobai',))
    p.start()
    p.join()    # 加上join方法后,父进程就会阻塞等待子进程结束而结束。
    print("父进程")
复制代码

Process开启多进程

多个进程同事运行(注意,子进程的执行顺序不是根据自动顺序决定的)

复制代码
import time
from multiprocessing import Process

def func(name):
    print("hello 进程 %d" % name )
    time.sleep(1)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=func, args=(i,))
        p.start()
复制代码
复制代码
import time
from multiprocessing import Process

def func(name):
    print("hello 进程 %d" % name )
    time.sleep(0.1)

if __name__ == '__main__':
    p_lst = []
    for i in range(10):
        p = Process(target=func, args=(i,))
        p.start()
        p_lst.append(p)
        p.join()
    print("父进程执行中")
复制代码

进程之间的数据隔离问题

复制代码
from multiprocessing import Process

n = 100     #在windows系统中把全局变量定义在if __name__ == '__main__'之上就可以了
def work():
    global n
    n = 0
    print("子进程内:", n)

if __name__ == '__main__':
    p = Process(target=work)
    p.start()
    print("主进程内:", n)
复制代码

守护进程

主进程创建守护进程,守护进程会随着主进程的结束而结束。守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

复制代码
import time
from multiprocessing import Process

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


if __name__ == '__main__':
    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True
    p1.start()
    p2.start()
    time.sleep(0.1)
    print("main-------")
#打印该行则主进程代码结束,则守护进程p1应该被终止.
#可能p1执行的打印信息任务会因为主进程打印(main----)被终止.
复制代码

socket聊天并发实例

复制代码
from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__': #windows下start进程一定要写到这下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()

使用多进程实现socket聊天并发-server
复制代码
复制代码
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

使用多进程实现socket聊天并发-client
复制代码

进程同步(锁)multiprocess.Lock

当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。

复制代码
# 多进程抢占输出资源
import os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s: %s is done' % (n, os.getpid()))

if __name__ == '__main__':
    for i in range(3):
        p = Process(target=work, args=(i,))
        p.start()

# 执行结果
"""
0: 14316 is running
1: 9900 is running
2: 10056 is running
1: 9900 is done
2: 10056 is done
0: 14316 is done
"""
复制代码
复制代码
# 使用锁维护执行顺序
import os
import time
import random
from multiprocessing import Process, Lock

def work(lock, n):
    lock.acquire()
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s: %s is done' % (n, os.getpid()))
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(3):
        p = Process(target=work, args=(lock, i))
        p.start()

# 执行结果
"""
0: 15276 is running
0: 15276 is done
1: 6360 is running
1: 6360 is done
2: 14776 is running
2: 14776 is done
"""
复制代码

上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了,没错,加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行修改,速度是慢了,但牺牲了速度却保证了数据的安全性。因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题,这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道

队列和管道都是将数据存放于内存中,队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可扩展性。

4、进程池(Using a pool of workers)

Pool类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。我们可以用Pool类创建一个进程池, 展开提交的任务给进程池。 例:

#apply
from  multiprocessing import Pool
import time
 
def f1(i):
    time.sleep(0.5)
    print(i)
    return i + 100
 
if __name__ == "__main__":
    pool = Pool(5)
    for i in range(1,31):
        pool.apply(func=f1,args=(i,))
 
#apply_async
def f1(i):
    time.sleep(0.5)
    print(i)
    return i + 100
def f2(arg):
    print(arg)
 
if __name__ == "__main__":
    pool = Pool(5)
    for i in range(1,31):
        pool.apply_async(func=f1,args=(i,),callback=f2)
    pool.close()
    pool.join()

一个进程池对象可以控制工作进程池的哪些工作可以被提交,它支持超时和回调的异步结果,有一个类似map的实现。

  • processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
  • initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个心的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
  • context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

注意:Pool对象的方法只可以被创建pool的进程所调用。

进程池的方法

  • apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : 是apply()的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。

  • close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

  • terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。

  • join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。

  • map(func, iterable[, chunksize])

  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])

  • imap(func, iterable[, chunksize])

  • imap_unordered(func, iterable[, chunksize])

  • starmap(func, iterable[, chunksize])

  • starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

python 线程

1、threading模块

threading 模块建立在 _thread 模块之上。thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。Thread方法:

t.start() : 激活线程,

t.getName() : 获取线程的名称

t.setName() : 设置线程的名称 

t.name : 获取或设置线程的名称

t.is_alive() : 判断线程是否为激活状态

t.isAlive() :判断线程是否为激活状态

t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

t.isDaemon() : 判断是否为守护线程

t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。

t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

t.run() :线程被cpu调度后自动执行线程对象的run方法

from threading import Thread
from threading import currentThread  # 获取当前线程对象的 对象
import time


def task():
    print('%s is runing' %currentThread().getName())  # 获取线程名
    time.sleep(2)
    print('%s is down' % currentThread().getName())


if __name__ == '__main__':
    t = Thread(target=task, name='这里设置子线程初始化名')
    t.start()
    t.setName('设置线程名')  # !!!!
    t.join()  # 等待子线程运行结束
    # currentThread() 等同于 线程对象t  所以获取线程名也可以t.getName()
    print('主线程', currentThread().getName())
    # 但在主线程内(并没有线程对象)要获取线程名必须用 currentThread().getName()
    t.isAlive()  # 线程是否存活! 查看线程对象是否存活

python 线程的两种开启方法

#方法1
from threading import Thread  # 创建线程的模块

def task(name):
    print(name)

if __name__ == '__main__':  
    # 开启线程  参数1:方法名(不要带括号)   参数2:参数(元祖)      返回对象
    p = Thread(target=task, args=('线程1',))
    p.start()  # 只是给操作系统发送了一个就绪信号,并不是执行。操作系统接收信号后安排cpu运行

    print('')


#方法2 - 类的方法
from threading import Thread  # 创建线程的模块

class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):  # 固定名字run !!!必须用固定名
        print(self.name)

if __name__ == '__main__':  # 必须要这样启动 
    p = MyThread('子线程1')
    p.start()
    print('主)

2、线程锁threading.RLock和threading.Lock

由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。为了保证数据的准确性,引入了锁的概念。所以可能出现如下问题:

例:假设列表A的所有元素就为0,当一个线程从前向后打印列表的所有元素,另外一个线程则从后向前修改列表的元素为1,那么输出的时候,列表的元素就会一部分为0,一部分为1,这就导致了数据的不一致。锁的出现解决了这个问题。

import threading
import time
  
globals_num = 0
  
lock = threading.RLock()
  
def Func():
    lock.acquire()  # 获得锁
    global globals_num
    globals_num += 1
    time.sleep(1)
    print(globals_num)
    lock.release()  # 释放锁
  
for i in range(10):
    t =threading.Thread(target=Func)
    t.start()

RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

3、threading.Event

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True
  • Event.isSet() :判断标识位是否为Ture。
import threading
  
def do(event):
    print('start')
    event.wait()
    print('execute')
  
event_obj = threading.Event()
for i in range(10):
    t = threading.Thread(target=do, args=(event_obj,))
    t.start()
  
event_obj.clear()
inp = input('input:')
if inp == 'true':
    event_obj.set()

当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。

5、threading.Condition

Condition类实现了一个conditon变量。 这个conditiaon变量允许一个或多个线程等待,直到他们被另一个线程通知。 如果lock参数,被给定一个非空的值,,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。

wait(timeout=None) : 等待通知,或者等到设定的超时时间。当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError 异常。 wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前 会一直阻塞。wait() 还可以指定一个超时时间。

如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。 notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait() 调用。除非线程调用notify()和notify_all()之后放弃了锁的所有权。

 例子: 生产者-消费者模型,

import threading
import time
def consumer(cond):
    with cond:
        print("consumer before wait")
        cond.wait()
        print("consumer after wait")
  
def producer(cond):
    with cond:
        print("producer before notifyAll")
        cond.notifyAll()
        print("producer after notifyAll")
  
condition = threading.Condition()
c1 = threading.Thread(name="c1", target=consumer, args=(condition,))
c2 = threading.Thread(name="c2", target=consumer, args=(condition,))
  
p = threading.Thread(name="p", target=producer, args=(condition,))
  
c1.start()
time.sleep(2)
c2.start()
time.sleep(2)
p.start()

5、queue模块

Queue 就是对队列,它是线程安全的,,举例来说,我们去麦当劳吃饭。饭店里面有厨师职位,前台负责把厨房做好的饭卖给顾客,顾客则去前台领取做好的饭。这里的前台就相当于我们的队列。形成管道样,厨师做好饭通过前台传送给顾客,所谓单向队列

这个模型也叫生产者-消费者模型。

import queue
 
q = queue.Queue(maxsize=0)  # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。
 
q.join()    # 等到队列为kong的时候,在执行别的操作
q.qsize()   # 返回队列的大小 (不可靠)
q.empty()   # 当队列为空的时候,返回True 否则返回False (不可靠)
q.full()    # 当队列满的时候,返回True,否则返回False (不可靠)
q.put(item, block=True, timeout=None) #  将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置,
                         为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,
                          如果队列无法给出放入item的位置,则引发 queue.Full 异常
q.get(block=True, timeout=None) #   移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞,
                      若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。
q.put_nowait(item) #   等效于 put(item,block=False)
q.get_nowait() #    等效于 get(item,block=False)

代码如下:

#!/usr/bin/env python
import Queue
import threading
 
message = Queue.Queue(10)
 
def producer(i):
    while True:
        message.put(i)
 
 
def consumer(i):
    while True:
        msg = message.get()
 
for i in range(12):
    t =threading.Thread(target=producer, args=(i,))
    t.start()
 
for i in range(10):
    t =threading.Thread(target=consumer, args=(i,))
    t.start()

 

 

python 协程

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

1.通过yield实现协程:

import time

def A():
    while 1:
        print('------A-----')
        time.sleep(0.1)
        yield()

def B():
    while 1:
        print('-------B-----')
        time.sleep(0.1)
        next(a)

a = A()
B()

执行结果:

-------B-----
------A-----
-------B-----
------A-----
-------B-----
------A-----
-------B-----
------A-----
-------B-----
------A-----
··· 

2.通过greenlet实现协程:

yield能实现协程,不过实现过程不易于理解,greenlet是在这方面做了改进。

from greenlet import greenlet
import time

def A():
    while 1:
        print('-------A-------')
        time.sleep(0.5)
        g2.switch()

def B():
    while 1:
        print('-------B-------')
        time.sleep(0.5)
        g1.switch()

g1 = greenlet(A)  #创建协程g1
g2 = greenlet(B)

g1.switch()  #跳转至协程g1

执行结果:

-------A-------
-------B-------
-------A-------
-------B-------
-------A-------
···

3.通过gevent实现协程:

greenlet可以实现协程,不过每一次都要人为的去指向下一个该执行的协程,显得太过麻烦。gevent每次遇到io操作,需要耗时等待时,会自动跳到下一个协程继续执行。

gevent 是一个第三方库,可以轻松通过gevent实现协程程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。gevent会主动识别程序内部的IO操作,当子程序遇到IO后,切换到别的子程序。如果所有的子程序都进入IO,则阻塞。

import gevent

def A():
    while 1:
        print('-------A-------')
        gevent.sleep(1) #用来模拟一个耗时操作,注意不是time模块中的sleep

def B():
    while 1:
        print('-------B-------')
        gevent.sleep(0.5)  #每当碰到耗时操作,会自动跳转至其他协程

g1 = gevent.spawn(A) # 创建一个协程
g2 = gevent.spawn(B)
g1.join()  #等待协程执行结束
g2.join()

执行结果:

-------A-------
-------B-------
-------B-------
-------A-------
-------B-------
-------B-------
-------A-------
-------B-------
-------B-------
···

 

4.协程gevent完成回显服务器:

import gevent
from gevent import monkey,socket

monkey.patch_all()   #有IO才做时需要这一句

s = socket.socket(2,1)  #用的都是gevent模块中的socket,但用法一样
s.setsockopt(1,2,1)
s.bind(('',8080))
s.listen(1024)

def func_accept():
    while 1:
        cs,userinfo = s.accept()
        print('来了一个客户'+str(userinfo))
        g = gevent.spawn(func_recv,cs)  #每当有用户连接,增加一条协程

def func_recv(cs):
    while 1:
        recv_data = cs.recv(1024)
        print(recv_data)  #程谁堵塞了,便会跳转至其他协程
        if len(recv_data) > 0:
            cs.send(recv_data)
        else:
            cs.close()
            break

g1 = gevent.spawn(func_accept)
g1.join()


原文地址:https://www.cnblogs.com/tester-l/p/6064212.html