Python:线程、进程和协程

首先,推荐一篇讲解进程与线程关系的漫画:http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html

线程

  在平时,我们如果要执行一个任务,需要排队执行,但是我们有了线程和进程就不一样了。比如,公司只有我和老板,有一天,老板给我派任务了,只有我一个人来做任务,用了一天完成,这就叫单线程;又有一天,老板又派了同样的任务,并且招了好几个技术人员,让我和他们一起完成任务,结果只用了一个小时,这就叫多线程

  我们写一段代码,编译器从上到下读的过程叫主线程,遇到threading.Thread就叫子线程。

threading模块

threading用于提供线程相关的操作,线程是应用程序中工作的最小单元

import threading
import time

def worker(num):
    time.sleep(1)
    print("Thread %d" % num)

for i in range(10):
    #args里面的参数必须是元组
    t = threading.Thread(target=worker,args=(i,),name = "t.%d" % i)
    t.start()
    print(t.name)#线程名

上述代码中创建了10个线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行命令。

  • start            线程准备就绪,等待CPU调度
  • setName      为线程设置名称
  • getName      获取线程名称
  • setDaemon   设置为后台线程或前台线程(默认)

                   如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
                   如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

  • join              逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
  • run              线程被cpu调度后自动执行线程对象的run方法

setdaemon:不等待子线程执行完就关闭

import threading
import time
def f0():
    pass
def f1(a1,a2):
    time.sleep(10)
    f0()

t1 = threading.Thread(target=f1,args=(123,111))
t1.setDaemon(True)
t1.start()

t2 = threading.Thread(target=f1,args=(123,111))
t2.setDaemon(True)
t2.start()

t3 = threading.Thread(target=f1,args=(123,111))
t3.setDaemon(True)
t3.start()

等待子线程执行完毕才关闭

import threading
import time
def f0():
    print("f0")
def f1(a1,a2):
    time.sleep(10)
    f0()

t1 = threading.Thread(target=f1,args=(123,111))
t1.start()
t2 = threading.Thread(target=f1,args=(123,111))
t2.start()
t3 = threading.Thread(target=f1,args=(123,111))
t3.start()

join:等当前线程执行结束后再执行下一个线程,可传参数,参数表示最多等的秒数

import threading
import time
def f0():
    print("f0")
def f1(a1,a2):
    print("f1")
    f0()

t1 = threading.Thread(target=f1,args=(123,111))
t1.start()
t1.join()
t2 = threading.Thread(target=f1,args=(123,111))
t2.start()
t2.join()

线程锁

我们使用线程对数据进行操作的时候,如果多个线程同时修改某个数据,可能会出现脏数据(不准确的数据),为了保证数据的准确性,就需要加把锁。

  锁有两种:RLock和Lock,这里我们使用RLock。

import threading
import time

globals_num = 0
lock = threading.RLock()

def func():
    lock.acquire() # 获得锁
    global globals_num
    globals_num +=1
    time.sleep(1)
    print(globals_num)
    lock.release() #释放锁

for i in range(10):
    r = threading.Thread(target=func)#创建线程锁
    r.start()

 

RLock和Lock的区别:

Lock:获得锁之后必须释放锁才能再次获得锁,不然会产生死锁,就会一直等待释放锁。

import threading
lock = threading.Lock()
lock.acquire()
lock.acquire() #产生死锁
lock.release()
lock.release()

import threading
lock = threading.Lock()
lock.acquire()
print("1")
lock.release()
print("ok")
lock.acquire()
print("2")
lock.release()

RLock:获得几把锁就要释放几把锁。

import threading
rlock = threading.RLock()
rlock.acquire()
rlock.acquire()
rlock.release()
rlock.release()
print("end")

import threading
rlock = threading.RLock()
rlock.acquire()
print("1")
rlock.acquire()
print("2")
rlock.release()
print("3")
rlock.release()
print("4")

event

Python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法:set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True
  • wait:阻塞线程,直到event对象内部标识位被设为True时。
import threading

def do(event):
    print('start')
    event.wait()
    print('execute')

event_obj = threading.Event()
for i in range(10):
    t = threading.Thread(target=do, args=(event_obj,))
    t.start()

event_obj.clear()
inp = input('input:')
if inp == 'true':
    event_obj.set()

queue模块

Queue就是队列,规则是先进先出。这个模型也叫生产者-消费者模型。

  生产者-消费者:创建一个为10的队列,生产12个,只消费10个。

import queue
import threading

message = queue.Queue(10) # 创建队列

def producer(i):
    print("put:",i)
    message.put(i)

def consumer(i):
    print("get:",i)
    msg = message.get()


for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()

join:等到队列为空时,再继续往下执行

put(item, block=True, timeout=None):放入队列尾部

get(block=True, timeout=None):获取队列第一个值

put_nowait:等效于 put(item,block=False)

get_nowait:等效于 get(item,block=False)

线程池

  先说一下线程池是什么鬼!之所以这么说,是因为我本以为很简单的东东,结果好复杂...但是!懂了之后,又觉得还好还好,没有特别难。其实就是用最少的劳动力,创造出最多的利益!线程池里面有很多线程,同时还有一个任务队列。执行任务过程就是,也就是说重复利用线程来执行任务,减少系统资源的开销。

  简单版线程池是线程执行完任务之后销毁,如果还有任务,就再创建线程去执行。

  高级版线程池就是线程执行完任务之后回来告诉回调函数执行完毕,回调函数不销毁它,而是把它作为空闲线程,不用创建新线程,如果任务队列中还有任务,就让它再去执行任务。

  so easy!妈妈再也不用担心我会掉进各种池啦......

简易版线程池(武大神说这是low版本):

import queue
import threading
import time

class ThreadPool(object):
    def __init__(self,max_num=20):
        self.queue = queue.Queue(max_num) # 长度为20的队列
        for i in range(max_num):
            self.queue.put(threading.Thread) # threading.Thread是类对象,放入队列中

    def get_thread(self):
        return self.queue.get() #获取队列中的类对象

    def add_thread(self):
        self.queue.put(threading.Thread)

def func(pool,a1):
    time.sleep(1)
    print(a1)
    pool.add_thread()

p = ThreadPool(10)

for i in range(50):
    thread = p.get_thread() #threading.Thread #得到一个线程
    t = thread(target=func,args=(p,i)) # 执行func函数
    t.start()

高级版线程池:

import queue
import threading
import contextlib
import time

StopEvent = object()

class ThreadPool(object):

    def __init__(self, max_num):
        self.q = queue.Queue() # 创建队列
        self.max_num = max_num # 最多创建线程数量

        self.terminal = False # 默认为False
        self.generate_list = []  # 实际创建的线程
        self.free_list = [] # 空闲的线程

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: # 如果空闲列表为0并且实际创建的线程列表长度小于最多创建的线程数
            self.generate_thread() # 创建线程
        w = (func, args, callback,) # 将任务元组赋值给w变量
        self.q.put(w) # 将任务放入队列中

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread # 创建当前线程
        self.generate_list.append(current_thread) # 将线程添加到实际创建线程列表中

        event = self.q.get() # 等待着去队列中获取线程
        while event != StopEvent: # 队列中没有停止符

            func, arguments, callback = event # 获得任务
            try:
                result = func(*arguments) #函数
                status = True #赋值True
            except Exception as e:
                status = False # 发生错误赋值False
                result = e # 错误类型

            if callback is not None: # 回调函数不为空
                try:
                    callback(status, result) # 执行回调函数
                except Exception as e:
                    pass

            if self.terminal: # False
                event = StopEvent
            else:
                with self.worker_state(self.free_list,current_thread): #with:上下文管理
                    event = self.q.get() #before:append;after:remove

        else:
            self.generate_list.remove(current_thread) #没有任务时,就把实际创建线程列表里的线程删掉

    @contextlib.contextmanager # 可以实现上下文管理的装饰器
    def worker_state(self, lis, val):
        lis.append(val)
        try:
            yield
        finally:
            lis.remove(val)


    def close(self):
        num = len(self.generate_list) #时间创建线程列表长度
        while num:
            self.q.put(StopEvent) #停止符放入已创建的线程中
            num -= 1

    # 终止线程(清空队列)
    def terminate(self):
        self.terminal = True
        while self.generate_list:
            self.q.put(StopEvent)
        self.q.empty()

def work(i):
    print(i)

pool = ThreadPool(10)
for item in range(50):
    pool.run(func=work, args=(item,))

pool.terminate() # 执行一下就关闭

ps:这里要补充一个知识点。

实现上下文管理(高级版本线程池中有出现)。需要导入模块,然后加上装饰器就ok。

示例:

import contextlib

@contextlib.contextmanager
def myopen(file_path,mode):
    f = open(file_path,mode,encoding="utf-8")
    try:
        yield f
    finally:
        f.close()
with myopen(
"index.html","r") as file_obj: print(file_obj.readline())

进程

注:由于进程之间的数据需要各自持有一份,所以创建进程需要非常大的开销,也就是占有很大的内存。同一个进程中,线程跟线程之间的内存是共享的。

创建进程:multiprocessing模块

from  multiprocessing import Process

def f(name):
    print("hello",name)

if __name__ == "__main__":
    p = Process(target=f,args=("bob",))
    p.start()
    print("start")
    p.join() # 等待
    print("end")

实现进程数据共享

#默认数据没有共享
from multiprocessing import Process,Manager

def Foo(i,dic):
    dic[i] = 100 + i
    print(dic)
    for k,v in dic.items():
        print(k,v)
if __name__ == "__main__":
    manage = Manager()
    dic = {}
    for i in range(2):
        p = Process(target=Foo,args=(i,dic,))
        p.start()
        p.join()
#打印结果:
{0: 100}
0 100
{1: 101}
1 101
#实现数据共享
from multiprocessing import Process,Manager
def foo(i,dic):
    dic[i] = 100 + i
    print(dic)
    print(len(dic))

if __name__ == "__main__":
    manage = Manager()
    dic = manage.dict()
    for i in range(2):
        p = Process(target=foo,args=(i,dic,))
        p.start()
        p.join()
#打印结果:
{0: 100}
1
{0: 100, 1: 101}
2
 'c': ctypes.c_char,  'u': ctypes.c_wchar,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
类型对应表

 

进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池程序中没有可供使用的进程,那么进程就会等待,直到进程池中有可用进程为止。

 进程池中有两个方法:apply_async、apply

apply_async:

一次创建多个进程,并发执行,执行完一个将返回值给回调函数然后执行回调函数
from multiprocessing import Pool
import time

def f1(a):
    time.sleep(1)
    print(a)
    return 1000

def f2(arg):
    print(arg)


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(10):
        pool.apply_async(func=f1,args=(i,),callback=f2)
        print("111111")
    pool.close()
    pool.join()
join:进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
apply
:
#排队执行,apply里面有join
from multiprocessing import Pool
import time
def f1(a):
    time.sleep(1)
    print(a)

if __name__ == "__main__":
    pool = Pool(5)
    for i in range(10):
        pool.apply(func=f1,args=(i,))
        print("111111")

进程与线程之间的关系

  线程是属于进程的,线程运行在进程空间内,并且同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。不管是进程还是线程,都是为了实现一个并发操作。

  io密集型:使用线程;计算密集型:使用进程。根本的原因是Python的线程里面有个锁(只有Python里有这个锁:GIL):全局解释器锁,规定进程里面只有一个线程能出来被CPU调用。

协程

线程在执行任务时,发现要等一段时间才能得到结果,就不等结果,再去执行其他任务。

线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

gevent

手动版协程:为了看出执行任务时需要等待,所以我们就睡一会咯(gevent.sleep)

import gevent
def foo():
    print("1")
    gevent.sleep() #表示暂停,去执行下一个
    print("3")

def bar():
    print("2")
    gevent.sleep()
    print("4")

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
#打印结果:
1
2
3
4

终极版协程:终极版就是不用手动让它睡一会就能看出效果的版本!

from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print(url,len(data))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://baidu.com/'),
])
#执行结果:需要大家自己去执行才能感同身受呢!!!
原文地址:https://www.cnblogs.com/0820-zq/p/5606545.html