No.012-Python-学习之路-Day9-GIL|Thread|Process|Coroutine

GIL-Python全局解释器锁<global interpreter lock>

GIL官方说明:

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

GIL本质是一把互斥锁,是互斥锁其作用就是将并行变为串行,其意思在于Python的线程是调用os的原生线程,Python无法有效的控制;当一个进程内多个线程<包括解释器自己的线程>要操作一个数据block时,会造成数据异常;而最简单的办法就是加锁,所以在cpython内存在一个全局的锁即GIL,用于确保在同一个时间段内进程中只有一个线程是正常运行的。

GIL并非是Python的一个特性,这个是CPython中的一个设计缺陷,其造成CPython的线程无法利用多核运行;其在PyPy及JsPython中没有这个问题。

So Python的Thread适合处理IO密集型的任务,在处理计算密集型任务时,反而会因为频繁的context切换而造成运行速度变慢。

GIL与普通互斥锁

GIL的存在保证了同一时刻只有一个线程在使用CPU,GIL在ticks=100IO操作的时候释放;

普通互斥锁则是为了在多线程时,保证修改共享数据时有序的修改,不会产生数据修改混乱;

如下图,GIL保护的是python解释器,而用户数据需要自定义互斥锁保护;

一个互斥锁与GIL同时使用的场景

(1)多线程运行,假设Thread1获得GIL可以使用cpu,这时Thread1获得 互斥锁lock,Thread1可以改date数据(但并
没有开始修改数据)

(2)Thread1线程在修改date数据前发生了 i/o操作 或者 ticks计数满100 (注意就是没有运行到修改data数据),这个
时候 Thread1 让出了Gil,Gil锁可以被竞争

(3) Thread1 和 Thread2 开始竞争 Gil (注意:如果Thread1是因为 i/o 阻塞 让出的Gil Thread2必定拿到Gil,如果
Thread1是因为ticks计数满100让出Gil 这个时候 Thread1 和 Thread2 公平竞争)

(4)假设 Thread2正好获得了GIL, 运行代码去修改共享数据date,由于Thread1有互斥锁lock,所以Thread2无法更改共享数据
date,这时Thread2让出Gil锁 , GIL锁再次发生竞争 

(5)假设Thread1又抢到GIL,由于其有互斥锁Lock所以其可以继续修改共享数据data,当Thread1修改完数据释放互斥锁lock,
Thread2在获得GIL与lock后才可对data进行修改。

互斥锁加与不加的区别<2.0在linux可以复现现象>

在python2.7上不加锁执行

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading, time

lock = threading.Lock()  # 生成一把锁 # 在3.0以后版本不会出错,自己加锁了;


def run():
    # lock.acquire()  # 上锁
    n = 0
    num = nums[0] 
    while n < 100: # 循环目的隔开取值与写回内存操作
        n += 1     # 同时概率性达到ticks切换GIL的目的
    num += 1
    nums[0] = num
    # lock.release()  # 修改完后,释放锁


nums = [1]

for i in range(1000):
    t = threading.Thread(target=run)
    t.start()

while threading.active_count() != 1:
    time.sleep(0.1)
else:
    print(nums[0])

'''
RESULT:并不是预期的1001
[root@sq ~]# python lock_test.py 
955
[root@sq ~]# python lock_test.py 
967
[root@sq ~]# python lock_test.py 
969
'''

在Python2上加锁执行,即去掉上面的注释符号

[root@sq ~]# python lock_test.py 
1001
[root@sq ~]# python lock_test.py 
1001
[root@sq ~]# python lock_test.py 
1001

当一个进程能同时包含多把锁时,使用普通的Thread.lock()会造成错的钥匙开锁的情况,这时候就需要使用递归锁->threading.Rlock()

import threading

"""
递归锁,造成的假死
"""

num1, num2 = 0, 0


lock = threading.RLock()  # 生成一把递归锁<多重锁的时候会认错钥匙>

def run1():
    print("---run1---")
    lock.acquire()
    global num1
    num1 += 1
    lock.release()
    return num1

def run2():
    print("---run2---")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2

def run3():
    lock.acquire()
    res1 = run1()
    print("---run3 run1-2----")
    res2 = run2()
    lock.release()
    print("value is [{}][{}]".format(res1, res2))

for i in range(10):
    t = threading.Thread(target=run3)
    t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print("all thread is over;")

当我们要限制进程生成速度时,可以使用threading.Semaphore(num),即信号量;一个场景如果我们需要并发100000个线程同时进行IO操作,如果不加限制会造成系统崩溃,此时就可以使用semaphore限制同一时间能够同时运行的线程数量;

semaphore.acquire() -> num += -1

semaphore.release() -> num += 1

当num  <= 0时,不在释放

import threading
import time

def get_time():
    print("{} is waiting sq.".format(threading.current_thread()))
    sq.acquire()
    print("{} is get sq.".format(threading.current_thread()))
    print(sq._value) # semaphore的计数
    time.sleep(1)
    print(time.ctime())
    sq.release()
    print("{} release sq.".format(threading.current_thread()))

sq = threading.Semaphore(5)

for i in range(100):
    t = threading.Thread(target=get_time)
    t.start()

while threading.active_count() != 1:
    time.sleep(0.1)
else:
    print("All Thread is over....")

'''
<Thread(Thread-1, started 38160)> is waiting sq.
<Thread(Thread-1, started 38160)> is get sq.
<Thread(Thread-2, started 33740)> is waiting sq.
<Thread(Thread-2, started 33740)> is get sq.
<Thread(Thread-3, started 9488)> is waiting sq.
<Thread(Thread-3, started 9488)> is get sq.
<Thread(Thread-4, started 8076)> is waiting sq.
<Thread(Thread-4, started 8076)> is get sq.
<Thread(Thread-5, started 35684)> is waiting sq.
<Thread(Thread-5, started 35684)> is get sq.
<Thread(Thread-6, started 6804)> is waiting sq.
<Thread(Thread-7, started 33728)> is waiting sq.
<Thread(Thread-8, started 28308)> is waiting sq.
...
'''

Thread与Process

线程<Thread>,是cpu能够调用的最小单位,是os发向cpu的一对指令,是进程中的实际运作单位;

进程<Process>, 每一个程序的内存是独立的,每个程序以整体的形式由os进行管理,os对程序进行整体的包装,里面包含各种资源的调用,如内存的管理,网络接口调用等;
对各种资源管理的集合就可以称为进程<Process>;

进程与线程的关系

进程要操作CPU,要执行,必须至少要先创建一个线程;进程本身并不具备执行能力,只是各种资源的集合;

在一个进程中的所有线程共享这个进程的内存空间,访问同一块数据时,为了保证有序性,使用互斥锁,串行访问;

每个进程启动时会创建一个线程即主线程,主线程可以创建子线程,但是子线程是独立的,除非被设置为守护线程,否则不受主线程影响;

进程与线程的区别

a.线程共享内存空间,进程<包括子进程>的内存是独立的,子进程copy父进程的内存并独立划分出自己的内存空间;  

b.同一个进程的线程们可以直接交流<涉及数据共享及信息传递>,两个进程间的通信,必须通过一个中间代理来实现;

c.新线程容易创建,新进程需要对其父进程进行一次克隆;

d.一个线程可以控制和操作同一进程里的其他线程,但是进程只能操作子进程;

e.对主线程的修改有可能影响到同一进程内的其他线程的行为,但是对一个父进程的修改,对子进程没有任何影响;

Thread的两种实现

方式一:直接调用

import threading, time
"""
the simplest threading;
"""

# 这是一个任务
def run(n):
    print("task", n)
    time.sleep(2)

# 启动两个线程执行这个任务
t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))
t1.start()
t2.start()

方式二:继承式调用

import threading, time
"""
the threading of a class;
"""

class Mythread(threading.Thread):

    def __init__(self, n):
        super(Mythread, self).__init__()
        self.n = n

    def run(self):
        print("running task", self.n)
        time.sleep(10)


t1 = Mythread("t1")
t2 = Mythread("t2")
t1.start()
t2.start()
Thread的deamon线程

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

Note:Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event.

只需要将线程的flag位设置为True,即为守护线程:

import threading
import time
import random

data = 1

def add():
    global data
    time.sleep(random.randrange(2))
    data += 1
    print("{} is over.".format(threading.current_thread()))


for i in range(10):
    t = threading.Thread(target=add)
    t.setDaemon(True) # 将daemon的flag位设置为True
    t.start()

print(data) # 打印完主线程结束

''' # 结果中只于4个进程在主线程结束前完成,且在主线程结束后,剩余daemon不再执行
<Thread(Thread-6, started daemon 29724)> is over.
<Thread(Thread-7, started daemon 7084)> is over.
<Thread(Thread-8, started daemon 5420)> is over.
<Thread(Thread-10, started daemon 29996)> is over.
'''
Thread的join()

join() 方法的功能是在程序指定位置,优先让该方法的调用者使用 CPU 资源。该方法的语法格式:thread.join( [timeout] )

import threading, time

"""
the threading of a class;
start a threading with a loop;
show how to use "thread_obj.join(timout=int())"
"""
class Mythread(threading.Thread):

    def __init__(self, n):
        super(Mythread, self).__init__()
        self.n = n

    def run(self):
        print("running task-", self.n, threading.current_thread())
        time.sleep(2)
        print("running over-%d" % self.n)

start = time.time()
for i in range(50):
    t1 = Mythread(i)
    t1.start()
end = time.time()
print("111", end-start)
# 这里面无法算出时间,因为多线程啊,主程序本身就是一个线程,与其他线程是并行关系;
# 默认情况下主线程不会等待子线程执行完毕的;既然默认,即可设置;
# t1.join() # 主线程等待子线程执行结果,会变成串行;
# threading.active_count() 显示目前进程中的线程数;
# threading.current_thread() 显示当前运行的线程;

# 方式一:# 主线程等待子线程执行结果,会变成串行;
start = time.time()
t_list = []
for i in range(50):
    t_list.append(Mythread(i))
    t_list[-1].start()
    t1.join() 

# 方式二:会等待先轮循的线程执行结束
for thread in t_list:
    thread.join()
end = time.time()
print(threading.current_thread(), end-start)
Thread的Timer

This class represents an action that should be run only after a certain amount of time has passe.

Timers are started, as with threads, by calling their start() method. The timer can be stopped (before its action has begun) by calling the cancel() method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.

Timer实现每隔一段时间运行某函数<global是否必须?进程的积压如何复现?>

import threading
import time

def hello(name):
    print("Hello %s" %name)
    print("活跃的线程数量: %s" % threading.active_count())
    global timer
    timer = threading.Timer(1.0, hello, ["Bruce"])
    timer.start()
    time.sleep(10)
    print("线程 %s is over." % threading.current_thread().ident)


if __name__ == "__main__":
    timer = threading.Timer(1.0, hello, ["Bruce"])
    timer.start()
Thread的Event

An event is a simple synchronization object;the event represents an internal flag, and threads can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event() ///生成event object

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set()
event.clear()

If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

Event用于实现两个或多个线程间的交互。如下面红绿灯例子,红绿灯线程设置event,汽车线程根据event的flag执行不行的动作;

import threading
import time


event = threading.Event()

def light():
    count = 0
    event.set()
    while True:
        if 30 >= count > 20: # 改红灯
            event.clear()
            print("33[41;1mred light is on...33[0m")
        elif count > 30:
            event.set()
            count = 0
        else:
            print("33[44;1mgreen light is on...33[0m")
        time.sleep(0.5)
        count += 1


def car(name):
    while True:
        if event.isSet():
            print("Green light...the Car {} across the road.".format(name))
            break
        else:
            print("Red light , the Car {} is waiting.".format(name))
            event.wait()

l = threading.Thread(target=light)
l.start()

cars = ["Bruce", "Amadeus", "Lee", "Vick", "John", "Google"]

for car_nam in cars:
    t = threading.Thread(target=car, args=(car_nam, ))
    time.sleep(3)
    t.start()

while threading.active_count() != 1:
    pass
else:
    print("thread is over;")

queue队列

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.there are three type of queue:

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out  
class queue.PriorityQueue(maxsize=0) #优先级越低越优

some functions in Queue

Queue.qsize() # 返回队列的长度
Queue.emput() # return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None) # block是否阻塞,timeout阻塞多长时间,过时raise the Full exception.
Queue.put_nowait(item) # equal Queue.put(item, block=False)
Queue.get(block=True, timeout=None) # block是否阻塞,timeout阻塞多长时间,过时raise the Empty exception.
Queue.get_nowait() #equal Queue.get(block=False)
Queue.task_done() # customer thread调用,每次从queue中get一个数据之后,当处理好相关问题,最后调用该方法,以提示q.join()是否停止阻塞,让线程向前执行或者退出;
Queue.join() # 阻塞,直到queue中的数据均被删除或者处理。为队列中的每一项都调用一次。

queue队列的作用

a.对于生产数据的可以自行分配,对于排队的程序可以将数据放置进队列,然后去处理其他的东西,提高效率;
b.生产数据与取数据的没有必然联系,两方只跟队列有关系,完成了程序的解耦;
c.队列可以理解为一个有顺序的容器;
列表与队列最直接的区别

a.列表也是容器,为什么不用列表呢?对于queue来说数据只有一份,取走了就没有了,列表则不是。

列表的简单实用-先入先出queue

import queue

q_first_first = queue.Queue(maxsize=3) # 先入先出
q_first_first.put(1)
q_first_first.put(2)
q_first_first.put(3)
print(q_first_first.get())
print(q_first_first.get())

列表的简单实用-后入先出queue

import queue

q_first_first = queue.LifoQueue(maxsize=3) # 后入先出
q_first_first.put(1)
q_first_first.put(2)
q_first_first.put(3)
print(q_first_first.get())
print(q_first_first.get())

列表的简单实用-后入先出queue

import queue

q = queue.PriorityQueue() # 优先级,越低越优化
q.put((1, 1))
q.put((2, 2))
q.put((-1, 3))
q.put((-2, 4))
q.put((-1, 5))
q.put((6, 6))
print(q.get())
print(q.get())

生产者消费者模型

在并发编程中使用生产者和消费者模型能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

生产者消费者模型的实现

生产者消费者模式通过一个容器来解决生产者和消费者的强耦合问题;生产者和消费者之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列;消费者不找生产者要数据,而是直接从阻塞队列里取;阻塞队列就相对于一个缓冲区平衡了生产者和消费者的处理能力。

如下面的生产与消费简单举例

import threading
import queue
import time
import random

q = queue.Queue(10)

def produce():
    count = 0
    while count < 100:
        if not q.full():
            p = "产品-%s" % count
            q.put(p)
            print("生产了[%s]" % p)
            count += 1
            time.sleep(0.1)
    q.join() # 等待所有产品被使用完,如果没有task_done的告知并不知道产品是否被使用
    print("共生产了%s个[%s],且已经用完。" % (count, p))


def customer(num, n):
    for i in range(n):
        if not q.empty():
            print("Customer<%s>消费了[%s]。" % (num, q.get()))
            q.task_done() # 告知生产者线程有个产品已经被使用
            time.sleep(0.3)
        else:
            print("Customer<%s>来晚了。" % num)
            break


p = threading.Thread(target=produce)
p.start()

for i in range(100):
    c = threading.Thread(target=customer, args=(i, random.randint(1, 4)))
    c.start()
    c.join()

while threading.active_count() != 1:
    time.sleep(0.1)
else:
    print("所有顾客都走了。")

多进程MultpProcessing

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency.

多进程的创建及进程id查看

"""
# 基本的进程创建,类似于thread的语法;
Linux中,每一个进程都是由父进程启动的;
os.getppid() 获取当前进程的父进程的pid
os.getpid() 获取当前进程的pid
"""
import os
import multiprocessing

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("

")

def f(name):
    info('33[31;1mfunction f33[0m')
    print('hello', name)

if __name__ == "__main__":
    info('33[41;1mmain    33[0m')
    p = multiprocessing.Process(target=f, args=('Bruce', ))
    p.start()
    p.join()
多进程间的通信

因进程间的内存是彼此独立的,进程间通信无法像Thread那样直接访问,进程间通信依赖于中间介质;有Queues、Pipes及Managers;

Queues

区别于threading queue<仅可由同一进程的线程访问>;进程queue用于进程间数据互访;

进程queue实现进程通信的过程:
  1.父进程克隆一份queue给子进程;
  2.子进程对queue进程了编辑;
  3.进程queue内部封装了一个中间方;
  4.子进程会把queue进行序列化<pickle>传给中间方;
  5.中间方反序列化传给父进程;

import multiprocessing

def f(qq):
    qq.put(["Bruce", "Lee"])

if __name__ == "__main__":
    q = multiprocessing.Queue() # 在win的pycharm下提示:PermissionError: [WinError 5] 拒绝访问。
    #q = multiprocessing.Manager().Queue() # 变更为这种queue即解决该问题???了解下
    p1 = multiprocessing.Process(target=f, args=(q, ))
    p1.start()
    print(q.get())
    p1.join()

Pipe

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

For example:

import multiprocessing

def f(conn):
    conn.send(["Bruce", "Lee", "From Child"])
    conn.send(["Bruce", "Lee", "From Child2"])
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = multiprocessing.Pipe() # pip会返回两端
    p1 = multiprocessing.Process(target=f, args=(child_conn, ))
    p1.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    print(parent_conn.recv())
    p1.join()
Management

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example:

from multiprocessing import Process, Manager
import os

def f(d, l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)


if __name__ == '__main__':
    with Manager() as manager: # 等同于manager = Manager()
        d = manager.dict() # 可在多个进程间传递和共享的字典

        l = manager.list(range(5)) # 可在多个进程间传递和共享的列表 #
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()

        print(d)
        print(l)

注: 表面是共享一个object,实际上是复制了多份,修改的同时management已经给加了锁;

进程同步

reason:For example,all process use the same screen<system source> to show the result。so without using the lock, output from the different processes is liable to get all mixed up.

"""
如果有个文件内是票的数量1个;
3个进程同时买票,如果没有锁在同一时间可能卖出>1个;
"""

from multiprocessing import Process, Lock
import json
import time

def showCounts():
    tickets = json.load(open("db.txt", "r", encoding="utf-8"))
    time.sleep(0.1)
    print("33[41m目前还剩[%s]张票.33[0m" % tickets["num"])

def getTicket():
    tickets = json.load(open("db.txt", "r", encoding="utf-8"))
    time.sleep(0.1)
    if tickets["num"] > 0:
        tickets["num"] -= 1
        time.sleep(2)
        json.dump(tickets,open("db.txt", "w", encoding="utf-8"))
        print('33[42m购票成功33[0m')

def main():
    showCounts()
    getTicket()


if __name__ == "__main__":
    for i in range(3):
        p = Process(target=main)
        p.start()
'''
Result:->成功卖出三次这是不合理的
目前还剩[1]张票.
目前还剩[1]张票.
目前还剩[1]张票.
购票成功
购票成功
购票成功
'''

而加锁让对db.txt的访问由并行变为串行

from multiprocessing import Process, Lock
import json
import time

def showCounts():
    tickets = json.load(open("db.txt", "r", encoding="utf-8"))
    time.sleep(0.1)
    print("33[41m目前还剩[%s]张票.33[0m" % tickets["num"])

def getTicket():
    tickets = json.load(open("db.txt", "r", encoding="utf-8"))
    time.sleep(0.1)
    if tickets["num"] > 0:
        tickets["num"] -= 1
        time.sleep(2)
        json.dump(tickets,open("db.txt", "w", encoding="utf-8"))
        print('33[42m购票成功33[0m')

def main(lock):
    showCounts()
    lock.acquire()
    getTicket()
    lock.release()


if __name__ == "__main__":
    procLock = Lock()
    for i in range(3):
        p = Process(target=main, args=(procLock,))
        p.start()
'''
Result:->成功卖出一次这是合理的
目前还剩[1]张票.
目前还剩[1]张票.
目前还剩[1]张票.
购票成功
'''
进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

用来解决的问题:

每个进程的新建都会复制父进程的内容,这会消耗很多的资源;如果不对进程数量进行一定的限制,那么过多的进程会造成系统内存的溢出;

from multiprocessing import Process, Pool
from multiprocessing import freeze_support # win中启动多进程需要新增这
import time
import os


def Foo(i):
    time.sleep(2)
    print(i+100)
    return i + 100


def Bar(arg):
    print('-->exec done:', arg, os.getpid()) #-> -->exec done: 100 11958


if __name__ == '__main__':
    print(__name__, os.getpid())
    freeze_support()
    pool = Pool(processes=2) #允许进程池同时放入5个进程,交由CPU运行;启动的是10个;
    for i in range(10):
        print("Process->%s" % i)
        pool.apply_async(func=Foo, args=(i,), callback=Bar)
        # callback回调函数,意为先执行func再执行callback;
        # 可以在执行相关操作后,写日志之类
        # callback是父进程进行的操作,而非子进程;
        
        #pool.apply_async(func=Foo, args=(i,))  # 异步执行->并行
        #pool.apply(func=Foo, args=(i, )) # 同步执行->串行

    print('end')
    pool.close() # 进程池关闭,需要先于join进行;
    print("pool close.")
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
进程池中Process可以使用get()获取函数的返回值
from multiprocessing import Process, Pool
from multiprocessing import freeze_support # win中启动多进程需要新增这
import time
import os


def Foo(i):
    time.sleep(2)
    print(i+100)
    return i + 100


def Bar(arg):
    print('-->exec done:', arg, os.getpid()) #-> -->exec done: 100 11958


if __name__ == '__main__':
    print(__name__, os.getpid())
    freeze_support()
    pool = Pool(processes=2)
    result = list()
    for i in range(5):
        result.append(pool.apply_async(func=Foo, args=(i,), callback=Bar))

    print('end')
    pool.close()
    print("pool close.")
    pool.join()

    for res in result:
        print("res-->%s" % res.get())
'''
result:
__main__ 936
end
pool close.
100
-->exec done: 100 936
101
-->exec done: 101 936
102
-->exec done: 102 936
103
-->exec done: 103 936
104
-->exec done: 104 936
res-->100
res-->101
res-->102
res-->103
res-->104
'''

协程(Coroutine)

协程即用户态的轻量级线程,即协程是由用户程序自己控制调度的;协程本质上就是一个线程,多线程的切换由操作系统控制,遇到I/O自动切换等;而协程的目的就是减少操作系统切换的开销(开关线程、创建寄存器、堆栈等,在他们间进行切换等),由我们自己的程序来控制任务的切换。所以协程拥有自己的寄存器上下文和栈,协程调度切换时,将寄存器上下文和栈保存到内存中,在切换回任务时恢复先前保存的寄存器上下文和栈。

如下使用yield实现是的“伪协程”

import time
import random

def customer(name):
    print("顾客%s进店准备吃包子. " % name)
    while True:
        baozi = yield "test"
        time.sleep(1)
        print("顾客%s吃了%s个%s包子. " % (name, baozi[0], baozi[1]))

menu = ["大葱馅", "韭菜馅", "白菜粉丝", "猪肉馅", "牛肉馅", "梅干菜肉", "豆沙馅"]
def producer():
    c1 = customer("Bruce") # 如果函数内有yield,则函数会被识别为生成器
    c2 = customer("Elvin") # 而生成器只有next方法,或者send方法才会运行至yield处
    c1.__next__() # 第一次运行,不能够使用send发送数据,因为未到yield处,而在函数开头
    c2.send(None) # 只能够发送None
    for thing in menu:
        time.sleep(1)
        print("包子店做了一笼{}的包子。".format(thing))
        c1_num = random.randint(1, 7)
        c2_num = 8 - c1_num
        c1_eat = [c1_num, thing]
        c2_eat = [c2_num, thing]
        c1.send(c1_eat)
        c2.send(c2_eat)

if __name__ == "__main__":
    producer()

为什么称yield为伪协程呢?其实因为yield未满足协程的4个条件中的一个,条件如下:

1.必须在只有一个单线程里实现并发;

2.修改共享数据不需要加锁;

3.用户程序里自己保存多个控制流的上下文栈;

4.一个协程遇到IO操作自动切换到其他协程;

协程的优点:

1.无需线程上下文切换的开销;

2.无需原子操作(注1)锁定及同步的开销;

3.方便切换控制流,简化编程模型;

4.高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题,所以很适合用于高并发处理。

协程的缺点:

1.无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用;

2.进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序。

真正的协程-模块gevent及Greenlet

协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:

1.可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行;

2.同时可以检测io操作,在遇到io操作的情况下才发生切换。

3.考虑一个问题,遇到IO就切换,那么什么时候切回来呢?

首先了解greenlet

greenlet提供了一种比generator更加便捷的切换方法,但并没有实现解决方案的第2点,如下

from greenlet import greenlet

def play(name):
    print("{} play 1 times.".format(name))
    crt2.switch("Alvin") # 2 # 第一次传入参数,第二次不需要
    print("{} play 2 times.".format(name))
    crt2.switch() # 4
    print("{} play 3 times.".format(name))
    crt2.switch() # 6

def eat(name):
    print("{} eat 1 times.".format(name))
    crt1.switch() # 3
    print("{} eat 2 times.".format(name))
    crt1.switch() # 5
    print("{} eat 3 times.".format(name))

if __name__ == "__main__":
    crt1 = greenlet(play)
    crt2 = greenlet(eat)
    crt1.switch("Bruce") # 1 # 第一次传入参数,之后不需要

'''
result:
Bruce play 1 times.
Alvin eat 1 times.
Bruce play 2 times.
Alvin eat 2 times.
Bruce play 3 times.
Alvin eat 3 times.
'''
第三方库Gevent

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程; Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

第三个问题的答案:使用callback函数,当IO操作结束后,让操作系统调用callback函数通知相关IO操作完成。

实例创建及主要函数

# 创建一个协程对象g1,spawn括号内第一个参数是函数名,之后为该函数所用到的参数;
g1=gevent.spawn(func,1,2,3,x=4,y=5)
g2=gevent.spawn(func)

# 等待协程执行完毕
g1.join() # 等待g1结束
g2.join() # 等待g2结束
gevent.joinall([g1, g2]) # 同时等待list内进程执行结束

# 获取返回值
g1.value # 拿到func的返回值

模拟遇到IO时,gevent的切换

import gevent

def play(name):
    print("{} play 1 times.".format(name))
    gevent.sleep(2) # 模拟遇到IO切换
    print("{} play 2 times.".format(name))
    gevent.sleep(3)
    print("{} play 3 times.".format(name))


def eat(name):
    print("{} eat 1 times.".format(name))
    gevent.sleep(1)
    print("{} eat 2 times.".format(name))
    gevent.sleep(2)
    print("{} eat 3 times.".format(name))
    gevent.sleep(3)
    print("{} eat 4 times.".format(name))

if __name__ == "__main__":
    g1 = gevent.spawn(play, "Bruce")
    g2 = gevent.spawn(eat, "Alvin")
    g3 = gevent.spawn(play, "Amadeus")
    g4 = gevent.spawn(eat, "Google")
    gevent.joinall([g1, g2, g3, g4])

以上是使用gevent.sleep(sec)来模拟io操作,但是gevent无法直接识别如time.sleep(sec),socket等真正的io操作,如下程序会变成串行

import gevent
import time

def play(name):
    print("{} play 1 times.".format(name))
    time.sleep(2) # 模拟遇到IO切换
    print("{} play 2 times.".format(name))
    time.sleep(3)
    print("{} play 3 times.".format(name))


def eat(name):
    print("{} eat 1 times.".format(name))
    time.sleep(1)
    print("{} eat 2 times.".format(name))
    time.sleep(2)
    print("{} eat 3 times.".format(name))


if __name__ == "__main__":
    g1 = gevent.spawn(play, "Bruce")
    g2 = gevent.spawn(eat, "Alvin")
    gevent.joinall([g1, g2])

'''
Result:
Bruce play 1 times.
Bruce play 2 times.
Bruce play 3 times.
Alvin eat 1 times.
Alvin eat 2 times.
Alvin eat 3 times.
'''

这里需要使用monkey.patch_all(),会把当前程序之后的所有io操作打上标记,则遇到这些标记<各种IO>,gevent会自动切换,如下:

import gevent
import time

from gevent import monkey # 从gevent中导入monkey模块
monkey.patch_all() # 必须使用早于被标记的io操作

def play(name):
    print("{} play 1 times.".format(name))
    time.sleep(2) # 模拟遇到IO切换
    print("{} play 2 times.".format(name))
    time.sleep(3)
    print("{} play 3 times.".format(name))


def eat(name):
    print("{} eat 1 times.".format(name))
    time.sleep(1)
    print("{} eat 2 times.".format(name))
    time.sleep(2)
    print("{} eat 3 times.".format(name))


if __name__ == "__main__":
    g1 = gevent.spawn(play, "Bruce")
    g2 = gevent.spawn(eat, "Alvin")
    gevent.joinall([g1, g2])

'''
Result:
Bruce play 1 times.
Alvin eat 1 times.
Alvin eat 2 times.
Bruce play 2 times.
Alvin eat 3 times.
Bruce play 3 times.
'''

使用gevent实现的socketServer及client

socketServer

import socket
import gevent
from gevent import monkey
monkey.patch_all()


def handle_fun(conn, addr):
    while True:
        try:
            rec = conn.recv(1024)
            print("From <{}> recv [{}]".format(
                addr,
                rec.decode()
            ))
            if not rec:
                print("连接[{}]已断开".format(addr))
                break
            conn.send(rec.upper())
        except Exception as ex:
            print("连接[{}]已断开".format(addr))
            print(repr(ex))
            break


server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
server.bind(("127.0.0.1", 5431))
server.listen(5)


while True:
    conn, addr = server.accept()
    print("新连接[{}]连接".format(addr))
    gevent.spawn(handle_fun, conn, addr)
客户端
import socket
import gevent
import time
import random
from gevent import monkey
monkey.patch_all()

def client_soc(msg):
    client = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
    client.connect(("127.0.0.1", 5431))
    while True:
        time.sleep(random.randint(1, 10))
        if isinstance(msg, int):
            msg += 1
        else:
            msg = int(msg)
            msg += 1
        if not msg:
            continue
        elif msg > 150:
            break
        msg = str(msg)
        client.send(msg.encode())
        print(client.recv(1024))

list1 = []
for i in range(100):
    list1.append(gevent.spawn(client_soc, i))

gevent.joinall(list1)











end

注1:原子操作(atomic operation):所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。

参考文档:

GIL相关:  https://www.cnblogs.com/value-code/p/8572852.html

      https://blog.csdn.net/weixin_41594007/article/details/79485847

进程锁:https://blog.csdn.net/qq_36357820/article/details/88743497

进程池:https://www.cnblogs.com/kaituorensheng/p/4465768.html

总文档参考:https://www.cnblogs.com/alex3714/articles/5230609.html

原文地址:https://www.cnblogs.com/FcBlogPythonLinux/p/12552868.html