【Python之路Day11】网络篇之线程、进程、协程

目录:

基本概念

线程

进程

协程

Python操作缓存

 一. 基本概念

现在的操作系统,如Unix、Linux、Windows、Mac OS X等,都是支持“多任务”的操作系统。

什么叫”多任务“呢?简单理解,就是我们可以一般上网浏览某车之家的网页,看看喜欢的车型信息;一边打开某易云音乐听听好歌;一边打开某软件股市行情图,不安的盯着曲线图...卧槽,又尼玛跌了!  这就是多任务喽。

多核心的CPU已经很普及了,但是,就是在过去的单核心CPU上,也可以执行多任务。

PS: CPU是分时间片的,假设任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3, 执行0.01秒....在这个切换的过程中,需要不断的保存程序的执行状态,和恢复执行状态,简单的说,这个过程就叫做”上下文切换", 注意,过多的上下文切换是会带来很多额外的开销的!

对操作系统来说, 一个任务(程序)就是一个进程(Process), 比如打开浏览器访问某车之家,就是一个浏览器的进程。打开一个某易云音乐,就是一个某易云音乐进程(Process)。

这时候我忽然又想打开邮箱查收下邮件,然后在浏览器的标签栏里又打开了Gmail邮箱;然后我又想发条微博说下,卧槽,我在写博客!而后又打开浏览器的一个标签栏,打开了weibo.com... 也就是说在浏览器进程内部,我要同时做好些事情,这些子任务我们称为" 线程 ” (Thread).

每个进程至少要做一件事,所以,一个进程至少有一个线程,当然也可以有多个。多个线程可以同时运行,多线程的执行方式和多进程是一样的,也是由操作系统内核在多个线程(进程)之间快速切换,让每个线程(进程)都交替的运行,看起来就像是同时执行一样。当然,真正同时执行多线程需要多核心CPU才可能实现。

 Python既支持多进程,又支持多线程。

小结:

  • 线程是最小的执行单元,进程由至少一个线程组成;
  • 进程和线程的调度,完全有操作系统决定,程序不能决定什么时候执行和执行多久。
  • 多进程和多线程会会有资源同步和数据共享的问题。后面再续...  

多线程、多进程

  • 1. 一个应用程序可以有多进程、多线程
  • 2. 默认是单进程、单线程
  • 3. 单进程,多线程,在Python中不会性能提升,在Java和C#中可以提升。

提高并发:

  • 多线程: IO操作,一般不会用到CPU,效率提升是可以的
  • 多进程:计算型操作, 需要占用CPU,因此性能不会有提升

线程和进程是一个很重要的概念

一. 线程

1. 基础:

是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务

概念太吓人了,先来看一下进程,这个相对于线程来说还是稍微好理解一点的。进程,是程序运行的实体,这句话的意思是,程序是存放在硬盘中的,当这个程序运行时,就会产生若干个进程,并且这个进程是可见的

那么什么是 线程呢,线程是一个任务流,它被包含在进程之中。

(1)线程的状态

线程有五种状态,状态转换如下图:

(2) 线程同步(锁)

多线程的优势在于可以同时运行多个任务,而多个线程之间用到的数据可以共享进程的,效率很高(但在Python中多线程尽量应用在IO密集型的程序中)。正因为这样,所以存在数据不同步的问题.

为了避免资源争夺,所以引入了锁的概念。锁有两种状态:锁定和未锁定(这不废话么?)。每当一个线程要访问共享数据时,必须先加把锁,而后在处理,处理完成后,解锁,让其他线程再来处理。

线程与锁:

(3) 线程通信(条件变量)

条件变量允许线程在条件不满足的时候等待,等到条件满足的时候的时候发出一个通知,而后继续运行。

(4) 线程运行和阻塞的状态转换

阻塞状态的三种情况:

  • 同步阻塞:运行竞争锁定的状态,线程请求锁定时将进入这个状态,一旦成功获得锁定又恢复到运行状态;
  • 等待阻塞:等待其他线程通知的状态,线程获得条件锁定后,调用“等待(wait())”将进入,一旦其他线程发出通知,线程将进入同步阻塞状态,再次竞争条件锁定;
  • 其他阻塞:运行的线程执行sleep()或者join()方法,或者发出I/O请求,会进入阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。

2. 基本使用:

Python通过threading模块来提供对线程的支持。threading基于Java的线程设计模型。锁(Lock)和条件变量(condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量),而Python中则是独立的对象。

Python Thread提供了Java Thread的行为的子集;

没有优先级、线程组,线程也不能被停止、暂停、恢复、中断。Java Thread中的部分被Python实现了的静态方法在Threading中以模块方法的形式提供。

threading常用方法:

threading.current_thread()   返回当前的线程变量。
threading.enumerate()         返回一个包含正在运行的线程的列表,正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.active_count()      返回正在运行的线程数量,与len(threading.enumerate())一样。

Thread

Thread是线程类,与Java类似,有两种使用方法,直接传入要运行的方法或从Thread继承并覆盖 run()。

PS: Thread中的run()方法,就是CPU来调度的时候执行的.

import threading
#创建线程第一种方法: 将要执行的方法做为参数传给方法
def f1(args):
    print(args)

t = threading.Thread(Target=f1, args=(123,))
t.start()
#第二种, 自定义一个类,而后继承threading.Thread
class MyThread(threading.Thread):  #自己写一个类,继承threading.Thread
    def __init__(self,func,args):
        self.func = func
        self.args = args
        super(MyThread, self).__init__()   #执行父类的构造方法

    def run(self):    #而后重写run方法
        self.func(self.args)

def f2(args):
    print(args)

res = MyThread(f2,456)   #传参数进去
res.start()              #开始线程

Thread构造方法(__init__):

  group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None

  • group: 线程组
  • target: 要执行的方法
  • name: 线程名
  • args/kwargs: 要传入的参数

常用方法:

  • is_alive():  返回线程是否在运行(启动前、终止前) 。
  • getName(): 获取当前线程名
  • setName():   设置线程名
  • isDaemon():  获取线程是否是守护线程。
  • setDaemon(): 设置是否是守护进程
    • 如果是后台线程,主线程执行过程中,后台线程也在运行,主线程执行完毕后,后台线程不论成功与否,全部都停止。
    • 如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程夜之星完成后,程序停止
  • start()  :  启动线程
  • join([timeout]):   阻塞当前上下文,直到调用此方法的线程终止或到达指定的timeout值
import threading
import time

def f1(args):
    time.sleep(5)
    print(args)

for i in range(10):
    t1 = threading.Thread(target=f1,args=(123,))
    t1.setDaemon(True)  #默认值为False,也就是主线程等待子线程,值为True之后,会不等子线程,直接往下解释执行,该退出就退出
    t1.start()
    t1.join(2)   #主线程阻塞,等待子线程等待完成, 参数的意思是最多等待的秒数. 如果不join的话,程序输出直接就是一个end,也就是子线程根本不会运行


print('End')

3. 线程锁

Lock

由于线程之间是进行随机调度的,每个线程可能只执行n条之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以线程锁就应用而生了。

Lock(指令锁) 是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。

Lock有两种状态: 

  • 锁定
  • 非锁定

构造方法:

  Lock()

以及两个基本的方法。

实例方法:

  • acquire([timeout])   使线程进入同步阻塞状态,尝试获得锁定。
  • release()    释放锁,使用前线程必须已经获得锁定,否则抛出异常。
#未使用锁的实例
import threading

NUM = 10
def f1(args):
    global NUM
    NUM -= 1
    print(NUM)

for i in range(10):
    t = threading.Thread(target=f1,args=(i,))
    t.start()
#使用Lock实例
import threading,time

NUM = 10

lock = threading.Lock()

def f1(args):
    global NUM
    lock.acquire()    #加锁
    NUM -= 1
    time.sleep(1)
    print(NUM)
    lock.release()    #释放锁

for i in range(10):
    t = threading.Thread(target=f1,args=(i,))
    t.start()

Rlock 

RLock() 是一个可以被同一个线程请求多次的同步指令。Rlock使用了“拥有的线程" 和 "递归等级"的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire() , 释放锁时需要调用release()相同次数。

可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用acquire()和release(),计数器将+1/-1,为0时处于未锁定状态。

构造方法:

  RLock()

实例方法:

  acquire()

  release()

import threading,time

NUM = 10

lock = threading.RLock()

def f1(args):
    global NUM
    lock.acquire()    #加锁
    NUM -= 1

    lock.acquire()
    time.sleep(1)
    lock.release()   #应用了几次acquire就需要几次release()
    lock.release()

    print(NUM)


for i in range(10):
    t = threading.Thread(target=f1,args=(i,))
    t.start()

Semaphore(信号量)

了解了上面的Lock和Rlock(互斥锁)之后,发现同时只能允许一个线程来更改数据,这和单线程就每个毛区别了,执行顺序不还是串行的么?而Semaphore是同时允许一定数量的线程来更改数据。可以理解成互斥锁的加强版,一个锁可以控制多个thread的访问,Lock的话,只能让一个线程来访问,Semaphore可以控制数量。

import threading,time

NUM = 10

semaphore = threading.BoundedSemaphore(5)   #最多允许5个线程同时运行,剩余的线程也只能阻塞等待了。。。

def f1(args):
    global NUM
    semaphore.acquire()
    NUM -= 1
    time.sleep(1)
    semaphore.release()

    print(NUM)


for i in range(10):
    t = threading.Thread(target=f1,args=(i,))
    t.start()

Semaphore内部管理着一个内置的计数器,当调用acquire()时计数器-1.调用release()时内置计数器+1。计数器不能小于0,当计数器为0时,acquire()将阻塞线程知道其他线程调用release()。

Event(事件)

event线程间通信的方式,一个线程可以发送信号,其他的线程接受到信号后执行操作。

事件提供了三个主要方法:

  • set   将Flag设置为True
  • wait  阻塞当前线程,直到event的内部标志位被设置为True或者Timeout超时。如果内部标志位为True则wait()函数理解返回。
  • clear  将Flag重新设置为False
  • is_set  返回标志位当前状态

事件处理机制: 全局定义了一个”Flag“, 如果值为False,那么当程序执行event.wait方法时就会阻塞,如果”Flag“值为True,那么event.wait方法时便不再阻塞。

import threading,time

class MyThread(threading.Thread):
    def __init__(self,signal):
        self.signal = signal
        super(MyThread,self).__init__()

    def run(self):   #重写run方法
        print('我是线程: %s, 因为我下面是wait(), 我要阻塞啦...'%self.name)
        self.signal.wait()
        print('我是线程: %s ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...'%self.name)

signal = threading.Event()

for i in range(10):
    t = MyThread(signal)
    t.start()

print('主线程休眠3秒...')
time.sleep(3)
signal.set()



#代码执行结果:
我是线程: Thread-1, 因为我下面是wait(), 我要阻塞啦...
我是线程: Thread-2, 因为我下面是wait(), 我要阻塞啦...
我是线程: Thread-3, 因为我下面是wait(), 我要阻塞啦...
我是线程: Thread-4, 因为我下面是wait(), 我要阻塞啦...
我是线程: Thread-5, 因为我下面是wait(), 我要阻塞啦...
我是线程: Thread-6, 因为我下面是wait(), 我要阻塞啦...
我是线程: Thread-7, 因为我下面是wait(), 我要阻塞啦...
我是线程: Thread-8, 因为我下面是wait(), 我要阻塞啦...
我是线程: Thread-9, 因为我下面是wait(), 我要阻塞啦...
我是线程: Thread-10, 因为我下面是wait(), 我要阻塞啦...
主线程休眠3秒...
我是线程: Thread-1 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...
我是线程: Thread-4 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...
我是线程: Thread-6 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...
我是线程: Thread-8 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...
我是线程: Thread-10 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...
我是线程: Thread-2 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...
我是线程: Thread-7 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...
我是线程: Thread-3 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...
我是线程: Thread-9 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...
我是线程: Thread-5 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...

Condition(条件) 

可以把Condition理解为一把高级的锁,它提供了比Lock,RLock更高级的功能,允许控制复杂的线程同步问题。

threading.Condition在内部维护了一个锁对象(默认是RLock),可以在创建Condition对象的时候把锁对象作为参数传入。Condition也提供了acquire和release方法,

含义与锁的一样。提示它只是简单的调用内部锁对象对应的方法而已。

Condition还提供了如下方法(PS:这些方法只有在占用锁acquire之后才能调用,否则会抛异常RuntimeError)

  • lock=threading.Condition()
  • lock.wait() wait方法释放内部所占用的锁,同时线程被挂起,直到接受到通知被唤醒或者超时(如果提供了timeout值的话)。当线程被唤醒并重新占有锁的时候,程序才会继续执行下去。
  • lock.notify() 唤起一个挂起的线程(如果存在挂起的线程)。PS:notify不会释放所占用的锁。
  • lock.notify_all() lock.notifyAll() 唤起所有挂起的线程。PS: 也不会释放占用的锁。
import threading

def run(num):
    con.acquire()   #
    con.wait()      #释放内部占用的锁,挂起线程,等待通知唤醒或者被超时
    print('Run the thread: %s'%num)
    con.release()

con = threading.Condition()

for i in range(10):
    t = threading.Thread(target=run,args=(i,))
    t.start()

while True:
    inp = input('>>>').strip()
    if inp == 'q':   #如果输入的是q,则推出
        break
    con.acquire()     #加锁
    con.notify(int(inp))   #发送通知,并将用户输入的数字转换为整形,发送唤醒的进程数量
    con.release()     #释放锁
import threading

def condition_func():

    ret = False
    inp = input('>>>').strip()
    if inp == '1':      #如果用户输入1,唤醒一个线程
        ret = True

    return ret


def run(num):
    con.acquire()
    con.wait_for(condition_func)   #接受一个函数
    print(condition_func)
    print('Run the thread: %s'%num )
    con.release()

con = threading.Condition()

for i in range(10):
    t = threading.Thread(target=run,args=(i,))
    t.start()

Timer 

定时器,指定n秒之后执行某操作。

from threading import Timer

def f1():
    print('Hello Tom') 

t = Timer(2,f1)   #两秒之后执行f1
t.start()

4. 生产者消费者模型

详见另一篇博文

二. 进程

1. 概念:

是计算机中已运行程序的实体。进程为曾经是分时系统的基本运作单位。在面向进程设计的系统(如早期的UNIX,Linux 2.4及更早的版本)中,进程是程序的基本执行实体;在面向线程设计的系统(如当代多数操作系统、Linux 2.6及更新的版本)中,进程本身不是基本运行单位,而是线程的容器。程序本身只是指令、数据及其组织形式的描述,进程才是程序(那些指令和数据)的真正运行实例。  

2. 基本操作

Unix/Linux操作系统提供了一个fork()的系统调用,它非常特殊。普通的函数调用一次,返回一次。但是fork(),调用一次返回两次,操作系统会自动把当前进程(父进程)复制了一份(子进程),而后,分别在父进程和子进程内返回。

子进程永远返回 0, 而父进程返回子进程的ID,这样做的理由是,一个父进程可以fork出很多的子进程。所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()即可拿到父进程的ID。

Python中的 os模块封装了常见的系统调用,包括fork,可以轻松创建子进程:

import os,time

print("Process (%s) start..." %os.getpid())

pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is (%s)'%(os.getpid(),os.getppid()))
else:
    print('I (%s) just created a child process(%s)'%(os.getpid(),pid))

time.sleep(10)


#执行代码结果:
Process (66554) start...
I (66554) just created a child process(66555)
I am child process (66555) and my parent is (66554)

PS: Windows中没有fork调用。

有了fork调用,一个进程在接到新任务时就可以fork一个子进程来处理新任务,常见的Apache服务器(Prefork模型)就是由父进程监听端口,每当有新的请求,就fork出子进程来处理新的请求。

multiprocessing

multiprocessing 模块是跨平台的多进程模块。multiprocessing模块提供了一个Process类来代表一个进程对象:

from multiprocessing import Process


def f1(i):
    print('hello',i)

for i in range(10):
    p = Process(target=f1,args=(i,))
    p.start()

#执行结果:
hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()要简单的多。

Pool(进程池)

如果要启动大量的子进程,可以使用进程池的方式批量创建子进程。进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用的进程位置。

进程池中的两个方法:

  • apply             以串行的方式执行(阻塞的)
  • apply_async   以异步的方式执行(非阻塞的)
  • close()           关闭pool,使其不再接受新的任务
  • terminate()    结束工作进程,不再处理未完成的任务
  • join()            主进程阻塞,等待子进程退出,join方法要在close或者terminate之后使用。
from multiprocessing import Pool
import os,time,random

def long_time_task(name):
    print('Running task %s(%s)...'%(name,os.getpid() ))
    start = time.time()
    time.sleep(random.random()*2)
    end = time.time()
    print('Task %s runs %0.2f seconds.'%(name,(end - start)))


if __name__ == '__main__':
    print('Parent process %s.'%os.getpid())
    p = Pool(5)     #创建一个进程池,并设定执行数量为5
    for i in range(10):
        p.apply_async(func=long_time_task,args=(i,))     #维持执行的进程总数为processes, 当一个进程执行完毕后会添加新的进程进去

    print('Waiting for all subprocess done...')
    p.close()    #使用join之前,先close,否则会报错,执行完close之后,不会有新的进程加入到pool
    p.join()                         #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
    print('All subprocess done.')    #全部执行完后,打印所有进程执行完成

执行结果:

Parent process 66995.
Waiting for all subprocess done...
Running task 0(66996)...
Running task 1(66997)...
Running task 2(66998)...
Running task 3(66999)...
Running task 4(67000)...
Task 2 runs 0.07 seconds.
Running task 5(66998)...
Task 5 runs 0.48 seconds.
Running task 6(66998)...
Task 3 runs 0.65 seconds.
Running task 7(66999)...
Task 4 runs 0.66 seconds.
Running task 8(67000)...
Task 6 runs 0.15 seconds.
Running task 9(66998)...
Task 9 runs 0.25 seconds.
Task 1 runs 0.98 seconds.
Task 0 runs 1.18 seconds.
Task 8 runs 0.99 seconds.
Task 7 runs 1.28 seconds.
All subprocess done.
from multiprocessing import Pool
import os,time,random

def long_time_task(name):
    print('Running task %s(%s)...'%(name,os.getpid() ))
    start = time.time()
    time.sleep(random.random()*2)
    end = time.time()
    print('Task %s runs %0.2f seconds.'%(name,(end - start)))


if __name__ == '__main__':
    print('Parent process %s.'%os.getpid())
    p = Pool(5)     #创建一个进程池,并设定执行数量为5
    for i in range(10):
        p.apply(func=long_time_task,args=(i,))     #维持执行的进程总数为processes, 当一个进程执行完毕后会添加新的进程进去

    print('Waiting for all subprocess done...')
    p.close()    #使用join之前,先close,否则会报错,执行完close之后,不会有新的进程加入到pool
    p.join()                         #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
    print('All subprocess done.')    #全部执行完后,打印所有进程执行完成


#执行代码结果:
Parent process 67049.
Running task 0(67050)...
Task 0 runs 0.05 seconds.
Running task 1(67051)...
Task 1 runs 1.47 seconds.
Running task 2(67052)...
Task 2 runs 0.68 seconds.
Running task 3(67053)...
Task 3 runs 0.91 seconds.
Running task 4(67054)...
Task 4 runs 0.51 seconds.
Running task 5(67050)...
Task 5 runs 0.86 seconds.
Running task 6(67051)...
Task 6 runs 0.93 seconds.
Running task 7(67052)...
Task 7 runs 1.02 seconds.
Running task 8(67053)...
Task 8 runs 0.42 seconds.
Running task 9(67054)...
Task 9 runs 0.41 seconds.
Waiting for all subprocess done...
All subprocess done.
阻塞apply

PS: Pool的默认大小是CPU的核心数,如果你有8核心CPU,你要提交至少9个子进程才能看到上面代码的效果。

进程间通信:

进程之间是需要通信的,操作系统提供了很多机制来实现进程间的通信。

在默认情况下进程间的数据是不共享的, 想象一下,你电脑上安装了某60的安全卫士,安装了某行的网银,这两个进程之间的数据共享、进程之间通信的话?嘿嘿... 

so,默认下,进程间的数据是不共享的。

但是,如果是我们

from multiprocessing import Process
from multiprocessing import Manager

import time,os

l1 = []

def f1(i):
    l1.append(i)  #往l1中追加值
    print('Process: %s, l1: %s' %(os.getpid(),l1))

for i in range(10):
    p = Process(target=f1,args=(i,))    #循环用10个进程,分别追加10个值,如果数据可以共享的话,l1中的值应该最后是0-9
    p.start()

time.sleep(1)
print('Process: %s
l1: %s
End!'%(os.getpid(),l1))   #最后打印进程值和l1的值

#执行代码结果:
Process: 67327, l1: [0]
Process: 67328, l1: [1]
Process: 67329, l1: [2]
Process: 67330, l1: [3]
Process: 67331, l1: [4]
Process: 67332, l1: [5]
Process: 67333, l1: [6]
Process: 67334, l1: [7]
Process: 67335, l1: [8]
Process: 67336, l1: [9]
Process: 67326
l1: []
End!

方法一:Array

PS: 不常用, 也可以实现进程间的数据共享,和Python的列表(链表) 特别相似。Java、C#中的数组。

数组只要定义好了,就必须:

  • 类型必须是一致的;
  • 个数也必须是一定的;
#方法一,Array
from multiprocessing import Process,Array
temp = Array('i', [1,2,3,4])

def Foo(i):
    temp[i] = 100+i
    for item in temp:
        print(i,'----->',item)

for i in range(4):
    p = Process(target=Foo,args=(i,))
    p.start()

#代码执行结果:
0 -----> 100
0 -----> 2
0 -----> 3
0 -----> 4
1 -----> 100
1 -----> 101
1 -----> 3
1 -----> 4
2 -----> 100
2 -----> 101
2 -----> 102
2 -----> 4
3 -----> 100
3 -----> 101
3 -----> 102
3 -----> 103
'c': ctypes.c_char,  'u': ctypes.c_wchar,
'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int,      'I': ctypes.c_uint,
'l': ctypes.c_long,   'L': ctypes.c_ulong,
'f': ctypes.c_float,   'd': ctypes.c_double
Array 类型对应表

方法二:Manager

from multiprocessing import Process,Manager

manage = Manager()
dic = manage.dict()

def f1(i):
    dic[i] = 100+i
    print(dic.values())

for i in range(5):
    p = Process(target=f1,args=(i,))
    p.start()
    p.join()

#代码执行结果:
[100]
[100, 101]
[100, 101, 102]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]

方式三: Queue(队列,常用)

#方法三: queue

from multiprocessing import Process
from multiprocessing import Queue

def f1(i,q):
    print(i,q.get())

if __name__ == '__main__':
    q = Queue()

    q.put(1)
    q.put(2)
    q.put(3)
    q.put(4)
    q.put(5)

    for i in range(5):
        p = Process(target=f1,args=(i,q))
        p.start()

#代码执行结果:
0 1
1 2
2 3
3 4
4 5

当创建进程时(非使用时), 共享数据会被拿到子进程中,当进程中的执行完毕后,再赋值给原值:

from multiprocessing import Process,Array,RLock

def f1(l,t,i):
    l.acquire()
    t[0] = 100 + i  #将第0个数+100
    for item in temp:
        print(i,'---->',item)
    l.release()

lock = RLock()
temp = Array('i',[1,2,3,4])

for i in range(20):
    p = Process(target=f1,args=(lock,temp,i))
    p.start()

#代码执行结果:
0 ----> 100
0 ----> 2
0 ----> 3
0 ----> 4
1 ----> 101
1 ----> 2
1 ----> 3
1 ----> 4
2 ----> 102
2 ----> 2
2 ----> 3
2 ----> 4
3 ----> 103
3 ----> 2
3 ----> 3
3 ----> 4
4 ----> 104
4 ----> 2
4 ----> 3
4 ----> 4
5 ----> 105
5 ----> 2
5 ----> 3
5 ----> 4
6 ----> 106
6 ----> 2
6 ----> 3
6 ----> 4
7 ----> 107
7 ----> 2
7 ----> 3
7 ----> 4
8 ----> 108
8 ----> 2
8 ----> 3
8 ----> 4
9 ----> 109
9 ----> 2
9 ----> 3
9 ----> 4
10 ----> 110
10 ----> 2
10 ----> 3
10 ----> 4
11 ----> 111
11 ----> 2
11 ----> 3
11 ----> 4
12 ----> 112
12 ----> 2
12 ----> 3
12 ----> 4
13 ----> 113
13 ----> 2
13 ----> 3
13 ----> 4
14 ----> 114
14 ----> 2
14 ----> 3
14 ----> 4
15 ----> 115
15 ----> 2
15 ----> 3
15 ----> 4
16 ----> 116
16 ----> 2
16 ----> 3
16 ----> 4
17 ----> 117
17 ----> 2
17 ----> 3
17 ----> 4
18 ----> 118
18 ----> 2
18 ----> 3
18 ----> 4
19 ----> 119
19 ----> 2
19 ----> 3
19 ----> 4

三. 协程

线程和进程的操作都是由程序触发操作系统接口,最后执行的是系统,协程的操作则是程序员本身。

协程(Coroutine),又称为微线程,纤程. 

协程存在的意义:

  • 对于多线程应用,CPU通过切片的方式来切换进程间的执行,线程切换时需要耗时(保存状态、恢复状态下次继续,即,上下文切换)。因此没有线程切换的开销,线程数量越多,协程的性能优势就越是明显。
  • 协程,则只使用一个线程,在一个线程中规定某个代码块的执行顺序。
  • 不需要多线程的锁机制,因为只要只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

协程适用的场景:

  当程序中存在大量不需要CPU的操作时(如I/O操作), 适用于协程。

子程序调用总是一个入口,一次返回,调用顺序是明确的。如: 函数A调用B,B在执行过程中调用C,C执行完毕返回,B执行完毕返回,最后是A。

def A():
    B()

def B():
    print('B')
    C()

def C():
    print('C')

A()

但是,协程的调用和子程序不同。协程看上去也是子程序,但在其内部执行过程中可以中断,而后去执行其他的子程序,在适当的时候在返回再接着执行。

greenlet

from greenlet import greenlet

def f1():
    print(123)
    gr2.switch()   #1, 跳到下面321
    print(456)
    gr2.switch()   #3. 跳到下面654

def f2():
    print(321)
    gr1.switch()   #2. 跳到上面456
    print(654)


gr1 = greenlet(f1)
gr2 = greenlet(f2)

gr1.switch()


#代码执行结果:
123
321
456
654

gevent

gevent是一个基于libev的并发库。它为各种并发和网络相关的任务提供了整洁的API.

在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

基本思想:

当一个greenlet遇到I/O操作时,比如访问网络,就自动切换到其他的greenlet,等到I/O操作完成,再在适当的时候切换回来继续执行。由于I/O操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换线程,就可以保证greenlet在运行,而不是等待I/O。

import gevent,os

def f1():
    print('Running in f1(%s)'%os.getpid())
    gevent.sleep(0)
    print('Explicit context switch to f1 again')

def f2():
    print('Explicit context to f2(%s)'%os.getpid())
    gevent.sleep(0)
    print('Implicit context switch back to f1')

gevent.joinall(
    [
        gevent.spawn(f1),
        gevent.spawn(f2),
    ]
)

#代码执行结果:
Running in f1(68207)
Explicit context to f2(68207)
Explicit context switch to f1 again
Implicit context switch back to f1
from gevent import monkey; monkey.patch_all()    #把socket请求做了一个封装,完成后才具有这种功能
import gevent
import requests


def f(url):
    print('GET: %s' %url)
    resp = requests.get(url)
    data = resp.text

    print('%d bytes received from %s. '%(len(data),url))

gevent.joinall([
    gevent.spawn(f,'https://www.python.org/'),
    gevent.spawn(f,'https://www.yahoo.com/'),
    gevent.spawn(f,'https://github.com/'),
    gevent.spawn(f,'https://www.dbq168.com/'),
])


#代码执行结果:
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
GET: https://www.dbq168.com/
464535 bytes received from https://www.yahoo.com/. 
47394 bytes received from https://www.python.org/. 
76292 bytes received from https://www.dbq168.com/. 
25533 bytes received from https://github.com/. 

从结果看,4个网络操作是并发执行的,而且结束顺序不同,但只有一个线程。

如果要让greenlet交替运行,可以通过gevent.sleep() 交出控制权。更多>>

五. Python 操作缓存

本章节包括Python操作Memcached以及Redis,详见另一篇博文.

原文地址:https://www.cnblogs.com/dubq/p/5680489.html