网络编程进阶:并发编程之多线程

多线程:

在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程; 进程的作用就是隔离数据。

进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是CPU上的执行单位。(进程必须靠线程去执行)

线程就类似于一条流水线工作的过程;多线程(即多个控制线程)的概念是:在一个进程中存在多个线程,多个线程共享该进程的地址空间;

线程、进程区别: 

1. 同一个进程内的多个线程共享该进程内的地址资源;

2. 创建线程的开销远小于创建进程的开销(创建一个进程需要向操作系统申请新的地址空间)

开启线程的两种方式(跟开启进程相似):

方式一:

from threading import Thread
import time

def sayhi(name):
    print("hello %s"%name)
    time.sleep(1)
    print("bye %s"%name)

if __name__ == "__main__":
    t1 = Thread(target=sayhi,args=("neo",))
    t1.start()  # t1.start()也是给操作系统发信号,但是操作系统已经不需要开辟新的内存空间
    print("主线程")   #  站在执行的角度看,这个py文件是主线程;站在资源的角度看,这个是主进程

# 运行结果:
# hello neo    # 先打印的是新线程中的内容,因为开辟新线程t1.start()只需要给操作系统发信号,不需要开辟新的内存空间,所以新线程内的任务可以立马执行;所以说创建线程的开销远小于创建进程
# 主线程
# bye neo

 方式二:

from threading import Thread
import time

class MyThread(Thread):
    def __init__(self,name):
        super().__init__()   # Thread子类的__init__()中,必须先写 super().__init__(),即先继承父类的__init__()再定义自己的逻辑
        self.name = name

    """
    If the subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) 
    before doing anything else to the thread.
    """

    def run(self):
        print("hello %s"%self.name)
        time.sleep(1)
        print("bye %s"%self.name)

if __name__ == "__main__":
    t1 = MyThread("neo")
    t1.start()
    print("主线程")

# 运行结果:
# hello neo
# 主线程
# bye neo

 查看当前进程的pid也可以利用multiprocessing模块下的 current_process().pid

Thread对象的其他属性或方法:

from threading import Thread,current_thread,active_count,enumerate
import time

def task():
    print("%s is running"%current_thread().getName())  # current_thread()是返回当前的线程变量; # getName()是返回线程名
    time.sleep(1)
    print("%s is done"%current_thread().getName())

if __name__ == "__main__":
    t = Thread(target=task,name="子线程1")  # 线程之间的地位是平等的,没有主、子之分; # name= 要是不写,系统用默认名
    t.start()  # 给操作系统发信号,几乎是瞬间开启新的线程

    t.setName("儿子线程1")  # 更改线程名字;  # 这个例子中,task中的第一个print,里面的线程名还是 “子线程1”,time.sleep(1)之后,第二个print的线程名就变成了“儿子线程1”

    print(active_count())  # 返回当前活跃的线程数
    print(enumerate())  # 返回当前所有活跃的线程对象;列表的形式  # active_count() 就相当于 len(enumerate())

    t.join()   #  主线程卡在这一步,等待t这个线程执行结束
    print(t.is_alive())  # 查看线程是否存活;返回bool值

    # print(t.getName())  # 在“主线程”中也可以查看“子线程”名
    print("主线程名字",current_thread().getName())
    current_thread().setName("主线程名")

运行结果:

守护线程:无论是进程还是线程,都遵循:守护xx在主xx运行完毕后被销毁

需要强调的是:运行完毕并非终止运行

  1. 对主进程来说,运行完毕指的是主进程代码运行完毕;

  2. 对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

详细解释:

  1. 主进程在其代码结束后就已经算运行完毕了(守护进程就在此时被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束;

  2. 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就会被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都会被回收,而进程必须保证非守护线程都运行完毕后才能结束

from threading import Thread
import time

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(2)
    print("end456")

if __name__ == "__main__":
    t1 = Thread(target=foo)
    t2 = Thread(target=bar)

    t1.daemon = True  # 把t1设置成守护线程; # 在start()之前设置
    t1.start()
    t2.start()

    print("main----")

# 运行结果:
# 123
# 456
# main----
# end123
# end456

线程互斥锁:

from threading import Thread,Lock
import time

n = 100

def task():
    global n  # global n 不需要加锁
    mutex.acquire()  # 加锁就是为了局部串行; # 修改共享数据的部分需要加锁
    temp = n
    time.sleep(0.1)  # 0.1秒足以让“主线程”中的那100个“子线程”起来
    n = temp - 1
    mutex.release()  # 通过加锁效率降低

if __name__ == "__main__":
    mutex = Lock()   # threading下面的Lock
    t_list = []
    for i in range(100):
        t = Thread(target=task)
        t_list.append(t)
        t.start()

    for t in t_list:
        t.join()

    print("",n)
    
# 运行结果:
# 主 0

GIL:global interpreter lock; 只有CPython才有GIL

GIL是加在CPython解释器上的互斥锁,本质也是一把互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享的数据只能被一个任务所修改,进而保证数据安全

保护不同的数据安全,就应该加不同的锁 (GIL保护的是解释器里关于垃圾回收线程的数据(解释器级别的数据),保护不了你自己的共享数据;想要保护你自己的共享数据,需要你自己加互斥锁)

要想了解GIL,需要确定一点: 每次执行python程序,都会产生一个独立的进程

# 运行一个py文件,启动的就是一个python解释器的进程(python.exe的)

# 运行python文件需要经历3步:
# 1. 先把python解释器的内容加载到内存
# 2. 把py文件的代码从硬盘加载到内存
# 3. 把py文件的内容交给Cpython解释器去执行(py文件里面的代码只是字符串,并不能运行;执行的是python解释器里面C给你实现的功能)
# 注: 第3步可以这么理解: 把python解释器当成一个函数,而你写的py文件代码是python解释器这个函数的参数,然后把py文件的代码当做参数传给python解释器这个函数
# 所以你运行一个py文件,产生的是一个python解释器的进程

GIL与多线程:

有了GIL的存在,同一时刻同一进程中只有一个线程被执行;所以多线程无法利用多核优势

多进程还是多线程?首先要明白:

  1. CPU不是用来做IO的,而是用来做计算的;

  2. 多CPU,意味着可以有多个核并行完成计算,所以多核提升的是计算性能

  3. 每个CPU一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用

结论:

  1. 对计算来说,CPU越多越好,但是对于I/O来说,再多的CPU也没用;

  2. 对运行一个程序来说,随着CPU的增多执行效率肯定会有所提高(不管提升幅度多大,总会有提高),这是因为一个程序基本不会是纯计算或者纯I/O,所以我们只能相对的去看一个程序到底是计算密集还是I/O密集型,从而得出用多线程还是多进程

现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率会有显著提升

多线程性能测试:

如果并发的多个任务是计算密集型:多进程效率高

# 计算密集型:多进程效率高

from multiprocessing import Process
from threading import Thread
import time,os

def task():
    res = 0
    for i in range(100000000):
        res *= i

if __name__ == "__main__":
    l = []
    print(os.cpu_count())  # 本机4核
    start = time.time()
    for i in range(4):
        p = Process(target=task)  # 耗时 16.18108034133911秒
        # p = Thread(target=task)    # 耗时 26.78442931175232秒
        l.append(p)
        p.start()

    for i in l:
        i.join()

    stop = time.time()

    print("run time is %s"%(stop-start))

如果并发的多个任务是I/O密集型:多线程效率高

from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
    time.sleep(2)
    # print('===>')

if __name__ == '__main__':
    l=[]
    print(os.cpu_count()) #本机为4核
    start=time.time()
    for i in range(400):
        p=Process(target=work) #耗时32s多,大部分时间耗费在创建进程上
        # p=Thread(target=work) #耗时2.0602962970733643 秒
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))

应用:多线程用于IO密集型,如socket,爬虫,web等; 多进程用于计算密集型(多进程能用到多核优势),如金融分析

死锁与递归锁:

死锁:指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,他们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程;如下所示:

from threading import Thread,Lock
import time

mutexA = Lock()
mutexB = Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print("%s 拿到A锁了"%self.name)  # Thread对象下面自带有 name这个属性

        mutexB.acquire()
        print("%s 拿到B锁了" % self.name)

        mutexB.release()
        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print("%s 拿到B锁了" % self.name)
        time.sleep(1)

        mutexA.acquire()
        print("%s 拿到A锁了" % self.name)
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":
    for i in range(5):
        t = MyThread()
        t.start()

# 运行结果:
# Thread-1 拿到A锁了
# Thread-1 拿到B锁了
# Thread-1 拿到B锁了
# Thread-2 拿到A锁了   # 出现死锁,整个程序阻塞住

递归锁:

解决方法,递归锁:在python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock;

这个RLock内部维护着一个Lock和一个counter(计数器)变量,counter记录了acquire的次数,从而使得资源可以被多次acquire;直到一个线程中所有的acquire都被release,其他的线程才能获得资源;上面的例子如果使用RLock代替Lock,则不会发生死锁,二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次

from threading import Thread,RLock
import time

mutexA = mutexB = RLock()  # 一个线程拿到锁,counter +1,该线程内又碰到加锁的情况,则counter持续 +1;这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print("%s 拿到A锁了"%self.name)

        mutexB.acquire()
        print("%s 拿到B锁了" % self.name)

        mutexB.release()
        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print("%s 拿到B锁了" % self.name)
        time.sleep(1)

        mutexA.acquire()
        print("%s 拿到A锁了" % self.name)
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":
    for i in range(5):
        t = MyThread()
        t.start()

信号量:

信号量也是一种锁,可以指定信号量(例如5),对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有多个任务拿到锁去执行

from threading import Thread,Semaphore,current_thread
import time

def task():
    with sm:
        print("%s get sm"%current_thread().getName())
        time.sleep(1)

    """
    with sm: 和 with open(file)的用法是一样的:
    在with sm会先自动 sm.acquire(), with sm内的代码执行完后会自动 sm.release()
    """

if __name__ == "__main__":
    sm = Semaphore(3)   # 3是指信号量的最大计数量(最多有几把锁),即同一时间最有能有几个线程抢到锁去执行
    for i in range(10):
        t = Thread(target=task)
        t.start()

"""
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器 -1; 调用 release()时内置计数器 +1;
计数器不能小于0,当计数器为0时, acquire()将阻塞线程直到其他线程调用 release()
"""

Event事件:

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,我们就需要使用threading库中的Event对象;对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始状况下,Event对象中的信号被设置为False;如果有线程等待一个Event对象,而这个Event对象的标志为False,那么这个线程将会被一直阻塞到该标志为True。一个线程如果将一个Event对象的信号标志设置为True,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为True的Event对象那么它将忽略这个事件,继续执行

from threading import Event
event.is_set()  # 返回event的状态值
event.wait()  # 如果 event.is_set() == False将阻塞线程  # wait(number)是设置最大等待时长(秒)
event.set()  # 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态,等待操作系统调度
event.clear()  # 恢复event的状态值为False

 

from threading import Thread,Event,current_thread
import time

event = Event()  # 实例化一个Event对象

def conn():
    n = 0
    while not event.is_set():   # 这行代码的含义是:event还没有被设置成True,即 还没有经历 event.set()这一步 # .is_set()也该写成 .isSet()
        if n == 6:
            print("%s tried to connect too many times"%current_thread().getName())
            return
        print("%s is trying to connect %s times"%(current_thread().getName(),n))
        event.wait(0.5)  # 最多等0.5s
        n += 1
    print("%s is connected"%current_thread().getName())

def check():
    print("%s is checking"%current_thread().getName())
    time.sleep(5)
    event.set()  # 这一步将event设置成了True

if __name__ == "__main__":
    for i in range(3):
        t = Thread(target=conn)
        t.start()
    t = Thread(target=check)
    t.start()

定时器(Timer):隔一段时间后执行某任务

from threading import Thread,Timer

def task(name):
    print("hello %s"%name)

timer = Timer(3,task,args=("neo",))  # Timer第一个参数interval传入时间间隔(单位:秒),意为几秒之后去执行后面的任务;第二个是函数名;后面传入函数需要传入的参数,两种方式:args和kwargs,args是原则的形式
timer.start()  # 启动这个定时器

"""
Timer是Thread的子类,Timer实例化后得到是一个线程对象
所以,timer.start()就是开启了一个新的线程
"""

利用定时器Timer制作一个定时有效的验证码 :

from threading import Timer
import random

class Code:

    def __init__(self):
        self.code_cache()   # 实例化的时候就先生成一个验证码存放到cache里面

    def code_cache(self):
        self.cache = self.make_code()   # 生成的验证码放到缓存里面
        print(self.cache)
        self.timer = Timer(10,self.code_cache)   # 每隔10秒就重新生成一个线程去调用code_cache()方法
        self.timer.start()

    def make_code(self,n=4):
        res = ""
        for i in range(n):
            s1 = str(random.randint(0,9))
            s2 = chr(random.randint(65,90))  # 65-90是A-Z在ASCII码中对应的位置
            res += random.choice([s1,s2])   # 从s1(数字)和s2(A-Z)中随机选出一个添加到 res 里面
        return res

    def check(self):
        while True:
            code_ipt = input("输入您的验证码:").strip()   # 如果输入的验证码不正确,则每隔10就生成一个新的验证码
            if code_ipt.upper() == self.cache:
                print("验证码正确")
                self.timer.cancel()   # 关闭定时器
                return

code = Code()
code.check()

线程queue:

 queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

有三种不同的用法:

class queue.Queue(maxsize=0) # 队列:先进先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

'''
结果(先进先出):
first
second
third
'''

 class queue.LifoQueue(maxsize=0)  # 堆栈:last in first out

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

'''
结果(后进先出):
third
second
first
'''

class queue.PriorityQueue(maxsize=0)  # 优先级队列: 存储数据时可设置优先级的队列

import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())

'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

基于多线程的套接字通信:

服务端:

from socket import *
from threading import Thread

def comm(conn):
    while True:
        try:
            data = conn.recv(1024)
       if not data: break conn.send(data.upper())
except ConnectionResetError: conn.close() break def server(ip,port): server = socket(AF_INET,SOCK_STREAM) server.bind((ip,port)) server.listen(5) while True: conn,client_addr = server.accept() t = Thread(target=comm,args=(conn,)) t.start() server.close() if __name__ == "__main__": server("127.0.0.1",8080)

"""
主线程也是一个线程,所以可以让主线程去做建链接的任务,开启新的“子线程”去做通信的任务
"""

线程池进程池:(池的目的是对数目加以限制)

基于多进程或多线程能实现并发的套接字通信,然而这种实现方式的致命缺陷是:服务端开启的进程数或线程数都会随着并发的客户端数目增多而增多,这会对服务端主机带来巨大的压力甚至瘫痪,所以我们必须要对服务端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行,这就是进程池或线程池的用途,例如进程池就是用来存放进程的池子,本质还是基于多进程,只不过是对开启进程的数目加上了限制

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor # 线程池,提供异步调用
ProcessPoolExecutor  # 进程池,提供异步调用
# 进程池:

from
concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time,random def task(name): print("name:%s pid:%s run"%(name,os.getpid())) time.sleep(random.randint(1,3)) if __name__ == "__main__": pool = ProcessPoolExecutor(4) # 4表示池子里面最多能放的进程数;如果不写,默认是CPU数 for i in range(10): pool.submit(task,"neo%s"%i) # pool.submit(func,args,kwargs) 第一个参数写函数名,后面写函数需要传入的参数;# pool.submit()是异步提交任务 # 异步提交任务:提交完任务后立马就走,不需要管任务有没有起来,也不需要拿结果 print("") """ 进程池 pool = ProcessPoolExecutor(4) 从始至终最多只会有4个进程;如下图运行结果的pid所示 """

线程池:

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import os,time,random

def task():
    print("thread name:%s pid:%s run"%(current_thread().getName(),os.getpid()))
    time.sleep(random.randint(1,3))

if __name__ == "__main__":
    pool = ThreadPoolExecutor(5)
    for i in range(10):
        pool.submit(task)

    pool.shutdown()  # shutdown()的作用是:相当于pool.close() + pool.join(),即关闭线程池的入口,并且等待池内所有线程执行完毕; # 默认 shutdown(wait=True)

    print("")

 提交任务的两种方式:

  1. 同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,在执行下一行代码(会导致程序变成串行执行)

  2. 异步调用:提交完任务后,不会等待任务执行完毕

回调函数: 可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务执行完后的对象当做参数传入,该函数称为回调函数

from concurrent.futures import ThreadPoolExecutor
import time,random

def task1(name):
    print("%s is running"%name)
    time.sleep(random.randint(2,3))
    length = random.randint(5,12)*"#"
    return {"name":name,"length":length}

def task2(obj):
    value_of_task1 = obj.result()  # obj是pool.submit(task1,"neo")执行完后的一个对象,需要利用 result()方法取出对象中的值,即 task1 return出来的值
    name = value_of_task1["name"]
    length = len(value_of_task1["length"])
    print("NAME:%s  LENGTH:%s"%(name,length))

if __name__ == "__main__":
    pool = ThreadPoolExecutor(13)

    pool.submit(task1,"neo").add_done_callback(task2)  # 这就是异步调用函数; # pool.submit(task1,"neo") 异步调用,执行完毕后会自动调用后面的task2函数,task2这个函数有且只能有一个参数,在给task2传参数时,会把pool.submit(task1,"neo")当成一个对象传入task2的那个参数中,所以task2中需要利用 result()把这个对象中的值取出来

    pool.submit(task1,"alex").add_done_callback(task2)
    pool.submit(task1,"egon").add_done_callback(task2)

"""
同步调用的方法:
如在 __main__ 中
res = pool.submit(task1,"neo").result()   # 同步调用; 此时提交完任务后程序就在这一步原地等待,直到新开的线程task1执行完毕后再利用 result()得到了task1中的返回值,程序才继续往下执行
"""

模拟爬去数据:

# requests模块可模拟浏览器的行为
# requests.get(url) # 到目标地址去下载一个文件到本地
# requests.get(url).text # 目标网页的内容


# 模拟爬虫:异步调用+回调函数的应用
import requests,time
from concurrent.futures import ThreadPoolExecutor

def get(url):
    print("getting %s"%url)
    response = requests.get(url)
    time.sleep(3)  # 模拟网络IO
    return {"url":url,"content":response.text}  # response.text是所下载网页中的内容

def parse(res):
    res = res.result()  # 取得 pool.submit(get,url)的结果
    url = res["url"]
    length = len(res["content"])
    print("parse %s res is %s"%(url,length))

if __name__ == "__main__":
    urls = [
        "https://docs.python.org/3/library/",
        "https://www.python.org/",
        "https://pypi.python.org/pypi"
    ]
    # 应该采用异步调用,保证爬取任务能并发执行
    # 爬去任务是IO密集型,因为get任务会遇到网络IO(下载网页等),所以应该采用多线程
    # 线程也不能无限增加,需要对线程数量做一个设置,所以应该采用 concurrent.futures 的 ThreadPoolExecutor

    pool = ThreadPoolExecutor(2)  # 提交了3个任务,但线程池最多能容纳2个,所以第3个任务会等池子中的某个任务执行完毕后再去执行
    for url in urls:
        pool.submit(get,url).add_done_callback(parse)

线程池的套接字通信:

from socket import *
from concurrent.futures import ThreadPoolExecutor

def comm(conn):
    conn = conn
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except ConnectionResetError:
            conn.close()
            break

def server(ip,port):
    server = socket(AF_INET,SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)

    while True:
        conn,client_addr = server.accept()
        pool.submit(comm,conn)  # 直接把comm放入线程池中就行,不需要回调函数

    server.close()

if __name__ == "__main__":
    pool = ThreadPoolExecutor(2)
    server("127.0.0.1",8081)  # 主线程还是用来干建链接的活

# 运行结果:
# 通过线程池控制了线程数,同一时间最多只能有两个服务端通信
原文地址:https://www.cnblogs.com/neozheng/p/8666563.html