并发编程之多线程

一、什么是线程

在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程

线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程

车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线  流水线的工作需要电源,电源就相当于cpu

所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。

多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。

开启线程的两种方式

from threading import Thread
import time

def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    print('主线程')
方式一
from threading import Thread
import time

class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)

if __name__ == '__main__':
    t = Sayhi('egon')
    t.start()
    print('主线程')
方式二 

二、线程进程的区别(挺重要)

线程 和进程的区别
进程是一个资源单位一个进程可以包含多个线程多个线程之间数据可以共享线程开销比进程小
在多线程中CPU的切换速度会非常快 但资源消耗没有进程高

英文锻炼:

1、Threads share the address space of the process that created it; processes have their own address space.
2、Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
3、Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
4、New threads are easily created; new processes require duplication of the parent process.
5、Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
6、Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
  多线程指的是,在一个进程中开启多个线程,简单的讲:如果多个任务共用一块地址空间,那么必须在一个进程内开启多个线程。详细的讲分为4点:

  1. 多线程共享一个进程的地址空间

      2. 线程比进程更轻量级,线程比进程更容易创建可撤销,在许多操作系统中,创建一个线程比创建一个进程要快10-100倍,在有大量线程需要动态和快速修改时,这一特性很有用

      3. 若多个线程都是cpu密集型的,那么并不能获得性能上的增强,但是如果存在大量的计算和大量的I/O处理,拥有多个线程允许这些活动彼此重叠运行,从而会加快程序执行的速度。

      4. 在多cpu系统中,为了最大限度的利用多核,可以开启多个线程,比开进程开销要小的多。(这一条并不适用于python)
为何要用多线程 

三、tread对象的其他属性或方法

Thread实例对象的方法
  # isAlive(): 返回线程是否活动的。
  # getName(): 返回线程名。
  # setName(): 设置线程名。

threading模块提供的一些方法:
  # threading.currentThread(): 返回当前的线程变量。
  # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
介绍
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主线程
    print(threading.enumerate()) #连同主线程在内有两个运行的线程
    print(threading.active_count())
    print('主线程/主进程')
验证
MainThread
<_MainThread(MainThread, started 140735268892672)>
[<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
主线程/主进程
Thread-1
执行结果
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主线程')
    print(t.is_alive())
主线程等待子线程结束
egon say hello
主线程
False
执行结果

四、守护线程

无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁

需要强调的是:运行完毕并非终止运行

1、对主进程来说,运行完毕指的是主进程代码运行完毕

2、对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必须在t.start()之前设置
    t.start()

    print('主线程')
    print(t.is_alive())
验证
主线程
True
执行结果

五、GIL全局解释器锁(特别之处 记住)

什么是GIL?   全局解释器锁 仅存在与Cpython
为什么需要它?   在同一时间只有一个线程在使用解释器
如果程序中只有一个线程还需要吗?  解释器会自己启动垃圾回收机制,也会造成解释器的竞争问题

例如 GC发现变量x引用计数为0 正准备清扫 CPU突然切换到了另一个线程a
a拿着x进行使用 在使用的过程中 又切换到了GC GC接着把X指向的空间进行释放
这样一来a中的x就无法使用了 GIL将分配内存回收内存相关的操作加了锁
GIL无法避免自定义的线程中的数据竞争问题

GIL带来的问题?

加锁虽然保证了数据的安全 但是降低了性能 在多CPU的机器上 无法利用多核提升效率
其他线程要想执行 必须等到之前的线程释放了GIL 这就意味着 同一时间只有一个线程在运行

为什么不用其他解释器?

因为cpython是c语言实现 可以无缝对接c现有的所有库 就是很多现成的功能

GIL 和 自定义互斥锁的异同点
相同点:都是互斥锁 争抢执行权是无序的 执行被锁定的代码时有序的
不同点:GIL锁的是解释器的数据 自定义互斥锁所得是用户自定义的数据

GIL的加锁与解锁 是自动执行的
自动释放的时间点: io/代码执行完毕 同一线程执行时间过长3ms(py3中)
执行的字节码指令数量达到一定值(py2中)

from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全,不加锁则结果可能为99
代码示范

六、死锁现象与递归锁

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('33[41m%s 拿到A锁33[0m' %self.name)

        mutexB.acquire()
        print('33[42m%s 拿到B锁33[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('33[43m%s 拿到B锁33[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('33[44m%s 拿到A锁33[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
死锁
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁 #出现死锁,整个程序阻塞住
执行结果

解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次

from threading import Thread,RLock
import time

mutexA=mutexB=RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('33[41m%s 拿到A锁33[0m' %self.name)

        mutexB.acquire()
        print('33[42m%s 拿到B锁33[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('33[43m%s 拿到B锁33[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('33[44m%s 拿到A锁33[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
View Code

七、同步异步 阻塞和非阻塞 异步回调

同步:提交任务需要等待任务执行完成才能继续执行
异步:提交任务不需要等待任务执行 可以立即继续执行
指的都是提交任务的方式
阻塞:遇到IO 失去了CPU执行权 看上去也是在等 与同步会混淆
非阻塞:就绪,运行,代码正常执行
异步回调:
为什么需要回调?
子进程帮助主进程完成任务 处理任务的结果应该交还给准进程
其他方式也可以将数据交还给主进程
1.shutdown 主进程会等到所有任务完成
2.result函数 会阻塞直到任务完成
都会阻塞 导致效率降低 所以使用回调
注意:
回调函数什么时候被执行? 子进程任务完成时
谁在执行回调函数? 主进程
线程的异步回调
使用方式都相同 唯一的不同是执行回调函数 是子线程在执行 

八、线程队列

class queue.Queue(maxsize=0) #队列:先进先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())



'''
结果(先进先出):
first
second
third
'''

class queue.LifoQueue(maxsize=0) #堆栈:last in fisrt out

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())



'''
结果(后进先出):
third
second
first
'''

class queue.PriorityQueue(maxsize=0) #优先级队列:存储数据时可设置优先级的队列

import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())



'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''
import queue
# 普通队列 先进先出
q = queue.Queue()
q.put("a")
q.put("b")


print(q.get())
print(q.get())

# 堆栈队列  先进后出 后进先出  函数调用就是进栈  函数结束就出栈 递归造成栈溢出
q2 = queue.LifoQueue()
q2.put("a")
q2.put("b")
print(q2.get())


# 优先级队列
q3 = queue.PriorityQueue()  # 数值越小优先级越高  优先级相同时 比较大小 小的先取
q3.put((-100,"c"))
q3.put((1,"a"))
q3.put((100,"b"))
print(q3.get())
三种形式

九、线程池和进程池

什么是池? 一种存储数据的容器 要存储的数据是线程或进程
为什么使用? 为了方便管多线程或进程 (在有很多子进程或子线程的情况下)
有什么特点:
1.管理进程的创建
2.管理进程的销毁
3.负责任务的分配
4.控制最大并发数量
注意:用池来处理TCP 是不正确的 因为进程中代码执行完毕才算空闲

1、submit(fn, *args, **kwargs)
异步提交任务

2、map(func, *iterables, timeout=None, chunksize=1) 
取代for循环submit的操作

3、shutdown(wait=True) 
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

4、result(timeout=None)
取得结果

5、add_done_callback(fn)
回调函数
基本使用方法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())
进/线程池用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit
map用法

 回调函数:

可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]
' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
view code

十、信号量,事件、定时器

信号量

from threading import Thread,Semaphore,current_thread,active_count

import time
# 用于控制 同时执行被锁定代码的线程数量   也就是线程的并发数量
# 也是一种锁
sm = Semaphore(1)

def task():
    sm.acquire()
    for i in range(10):
        print(current_thread())
        time.sleep(0.5)
    sm.release()

def task2():
     for i in range(10):
        print(current_thread())
        time.sleep(0.5)


for i in range(5):
    Thread(target=task).start()
    Thread(target=task2).start()
print(active_count())

事件

事件是什么? 某件事情发生的信号
用来干什么? 在线程间通讯 然而线程本来就能通讯  作用只有一个就是简化代码

线程间通讯的例子
服务器启动需要五秒
客户端启动后去连接服务器
去连接服务器必须保证服务器已经开启成功了

是否启动完成就是要通讯的内容

注意 Event线程通讯 仅仅用于简单的条件判断 说白了代替bool类型 和if判断
set() 将状态修改为True
wati() 等待状态为True才继续执行

import time
from threading import Thread
boot = False
def server_task():
    global boot
    print("正在启动....")
    time.sleep(5)
    print("启动....成功")
    boot = True

def client_task():
    while True:
        print("连接服务器....")
        time.sleep(1)
        if boot:
            print("连接成功")
            break
        else:
            print("error 连接失败 服务器未启动!!")

t1 = Thread(target=server_task)
t1.start()

t2 = Thread(target=client_task)
t2.start()

t1.join()
t2.join()
# import time
# from threading import Thread,Event
# event =Event()
#
# def server_task():
#     print("正在启动....")
#     time.sleep(5)
#     print("启动....成功")
#     event.set()
#
# def client_task():
#     event.wait() #一个阻塞的函数  会阻塞直到对event执行set函数为止
#     print("连接成功!")
#
# t1 = Thread(target=server_task)
# t1.start()
# t2 = Thread(target=client_task)
# t2.start()
使用事件实现

定时器

定时器,指定n秒后执行某个操作

from threading import Timer
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed
原文地址:https://www.cnblogs.com/wanlei/p/9947474.html