Python并发编程之多线程

本节内容

  • 什么是线程
  • 线程与进程的区别
  • 开启线程的两种方式
  • Thread对象的其他属性或方法
  • 守护线程
  • GIL全局解释器锁
  • 死锁和递归锁
  • 信号量 event 计时器
  •  线程queue

一 什么是线程

线程相对于进程更为轻量级,当一个进程启动同时也会启动一个主线程,多线程就是指在一个进程下创建多个线程并且这些线程共享地址空间。所以进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。

二 线程与进程的区别

1 Threads share the address space of the process that created it; processes have their own address space.

2 Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.

3 Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.

4 New threads are easily created; new processes require duplication of the parent process.

5 Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.

6 Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

总结上述区别,无非两个关键点,这也是我们在特定的场景下需要使用多线程的原因:

同一个进程内的多个线程共享该进程内的地址资源
创建线程的开销要远小于创建进程的开销(创建一个进程,就是创建一个车间,涉及到申请空间,而且在该空间内建至少一条流水线,但创建线程,就只是在一个车间内造一条流水线,无需申请空间,所以创建开销小)

三 开启线程的两种方式

开启线程的方式

方式一

1.创建线程的开销比创建进程的开销小,因而创建线程的速度快
from multiprocessing import Process
from threading import Thread
import os
import time
def work():
    print('<%s> is running'%os.getpid())
    time.sleep(2)
    print('<%s> is done'%os.getpid())

if __name__ == '__main__':
    t=Thread(target=work,)
    # t= Process(target=work,)
    t.start()
    print('',os.getpid())

开启进程的第一种方式
方式一

方式二

from threading import Thread
import time
class Work(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        # time.sleep(2)
        print('%s say hell'%self.name)
if __name__ == '__main__':
    t = Work('egon')
    t.start()
    print('')

开启线程的第二种方式(用类)
方式二

基于多进程多线程实现套接字通信

import socket
from multiprocessing import Process
from threading import Thread


def create_socket():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('127.0.0.3', 8080))
    server.listen(5)
    return server


def talk(conn):
    while True:
        try:
            data = conn.recv(1024)
            if data is None: break
            conn.send(data.upper())
        except ConnectionError:
            break
    conn.close()


def communication(server):
    while True:
        print('wait....')
        conn, add = server.accept()
        t = Thread(target=talk, args=(conn,))
        t.start()


if __name__ == '__main__':
    server = create_socket()
    p1 = Process(target=communication, args=(server,))
    p2 = Process(target=communication, args=(server,))
    p1.start()
    p2.start()
socket

编写一个简单的文本处理工具,具备三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件

from threading import Thread

msg_l = []
format_l = []


def user_input():
    while True:
        text = input('请输入内容:')
        if text is None: continue
        msg_l.append(text)


def format_text():
    while True:
        if msg_l:
            reg = msg_l.pop()
            format_l.append(reg.upper())


def save():
    while True:
        if format_l:
            with open('db1.txt', 'a', encoding='utf-8') as f:
                res = format_l.pop()
                f.write('%s
' % res)
                f.flush()


if __name__ == '__main__':
    t1 = Thread(target=user_input)
    t2 = Thread(target=format_text)
    t3 = Thread(target=save)
    
    t1.start()
    t2.start()
    t3.start()
View Code

四 Thread对象的其他属性或方法

Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。 
      # getName(): 返回线程名。
      # setName(): 设置线程名。
threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

五 守护线程

无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁
需要强调的是:运行完毕并非终止运行
1、对主进程来说,运行完毕指的是主进程代码运行完毕
2、对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

详细解释:

1、主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
2、主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

思考下述代码的执行结果有可能是哪些情况?为什么?

from threading import Thread
import time

def foo():
    print(123)
    time.sleep(1)
        print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)

    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")
以上代码首先会输出 123,456,main, 随后会输出end123,end456。因为t1守护的是主进程,让主进程执行完print("main-------")线程2已经在运行了所以主进程并没有结束,等到子线程运行完毕才会回收子进程的资源进程才会结束

六 GIL全局解释器锁

1 定义:

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)

结论:在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。
就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。>有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。
然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。
所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

2 GIL解析

GIL本身就是一把互斥锁,所有互斥锁本质都是一样的,同一时间内共享数据只能被一个任务所修改进而保证数据安全.
在一个Python进程内不仅有当前任务的主进程或者当前主线程开启的其他线程,还有解释器开启的垃圾回收等解释器级别的线程
总之所有线程都运行在一个进程内。
如果多个线程的target=work,那么执行流程是多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行

3 GIL与lock

很多人会有这样一个疑问::Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?
首先GIL与Lock和目的都是为了保护数据安全的,但是他们所保护的数据有所不同,前者是解释器级别(保护的就是解释器级别的数据,比如垃圾回收的数据),
后者保护的是用户自己开的应用程序的数据而GIL确不负责这件事

分析:

  • 100个线程去抢GIL锁,即抢执行权限
  • 肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()
  • 极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL
  • 直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程

代码演示:

from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全,不加锁则结果可能为99

七 死锁和递归锁

1 死锁的现象

所谓死锁是指两个或两个以上的进程或线程在执行过程中因争夺资源而造成的一种互相等待现象,若无外力作用他们将无法推进下去

死锁代码:

 1 from threading import Thread,Lock
 2 import time
 3 mutexA=Lock()
 4 mutexB=Lock()
 5 
 6 class MyThread(Thread):
 7     def run(self):
 8         self.func1()
 9         self.func2()
10     def func1(self):
11         mutexA.acquire()
12         print('33[41m%s 拿到A锁33[0m' %self.name)
13 
14         mutexB.acquire()
15         print('33[42m%s 拿到B锁33[0m' %self.name)
16         mutexB.release()
17 
18         mutexA.release()
19 
20     def func2(self):
21         mutexB.acquire()
22         print('33[43m%s 拿到B锁33[0m' %self.name)
23         time.sleep(2)
24 
25         mutexA.acquire()
26         print('33[44m%s 拿到A锁33[0m' %self.name)
27         mutexA.release()
28 
29         mutexB.release()
30 
31 if __name__ == '__main__':
32         for i in range(10):
33             t=MyThread()
34             t.start()
35 执行效果
36 
37     Thread-1 拿到A锁
38     Thread-1 拿到B锁
39     Thread-1 拿到B锁
40     Thread-2 拿到A锁 #出现死锁,整个程序阻塞住

2 递归锁

为了解决死锁产生的问题,我们可以使用递归锁,这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。

直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次

from threading import Thread,RLock
import time
mutexA=mutexB=RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止

class MyThread(Thread):
	def run(self):
		self.func1()
		self.func2()
	def func1(self):
		mutexA.acquire()
		print('33[41m%s 拿到A锁33[0m' %self.name)

		mutexB.acquire()
		print('33[42m%s 拿到B锁33[0m' %self.name)
		mutexB.release()

		mutexA.release()

      def func2(self):
		mutexB.acquire()
		print('33[43m%s 拿到B锁33[0m' %self.name)
		time.sleep(2)

		mutexA.acquire()
		print('33[44m%s 拿到A锁33[0m' %self.name)
		mutexA.release()

		mutexB.release()

if __name__ == '__main__':
	for i in range(10):
            t=MyThread()
            t.start()    

3 Join与索的区别

有的同学可能有疑问:既然加锁会让运行变成串行,那么我在start之后立即使用join,就不用加锁了啊,也是串行的效果啊
没错:在start之后立刻使用jion,肯定会将100个任务的执行变成串行,毫无疑问,最终n的结果也肯定是0,是安全的,但问题是
start后立即join:任务内的所有代码都是串行执行的,而加锁,只是加锁的部分即修改共享数据的部分是串行的
单从保证数据安全方面,二者都可以实现,但很明显是加锁的效率更高.

from threading import current_thread, Thread, Lock
import os, time


def task():
    time.sleep(3)
    print('%s start to run' % current_thread().getName())

    global n
    temp = n
    time.sleep(0.5)
    n = temp - 1


if __name__ == '__main__':
    n = 100
    lock = Lock()
    start_time = time.time()
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t.join()
stop_time = time.time()
print('主:%s n:%s' % (stop_time - start_time, n))

'''
100 Thread-1 start to run
101 Thread-2 start to run
102 ......
103 Thread-100 start to run
104 主:350.6937336921692 n:0 #耗时是多么的恐怖
105 '''
join实现锁功能

八 信号量 event 计时器 

1 信号量  

 信号量相当于停车场的停车位,限定同时又N个线程同时进入,如果超出就其他线程就要等待,直到里面一个线程将锁释

from threading import Thread,Semaphore
import threading
import time


def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()


if __name__ == '__main__':
    sm = Semaphore(5)
    for i in range(23):
        t = Thread(target=func)
        t.start()
'''   
解析
    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;
    调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
'''

 2 Event

线程的一个关键特性是每个线程都是独立运行且状态不可预测,如果程序中的其他线程需要通过判断某个线程的状态来确定下一步的操作
为了解决这些问题就需要用到threading中的Event对象了,Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。
在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。
一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

Event中的方法

from threading import Event
event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。   

示例一:
 9   from threading import Thread, Event
10   import time
11     event = Event()
12     def student(name):
13        print('学生%s 正在听课' %name)
14        event.wait()
15        print('学生%s 课件活动' %name)
16 
17     def teacher(name):
18        print('老师%s 正在上课'%name)
19        time.sleep(2)
20        print('下课了')
21        event.set()
22 
23 if __name__ == '__main__':
24     stu1 = Thread(target=student, args=('sanm',))
25     stu2 = Thread(target=student, args=('beal',))
26     stu3 = Thread(target=student, args=('jerry',))
27    teacher = Thread(target=teacher, args=('harry',))
28    stu1.start()
29    stu2.start()
30    stu3.start()

示例二:

 1 from threading import Thread,Event
 2 import threading
 3 import time,random
 4 def conn_mysql():
 5         count=1
 6         while not event.is_set():
 7             if count > 3:
 8                 raise TimeoutError('链接超时')
 9             print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
10             event.wait(0.5)
11             count+=1
12             print('<%s>链接成功' %threading.current_thread().getName())
13 
14 
15 def check_mysql():
16         print('33[45m[%s]正在检查mysql33[0m' % threading.current_thread().getName())
17         time.sleep(random.randint(2,4))
18         event.set()
19 
20 if __name__ == '__main__':
21     event=Event()
22     conn1=Thread(target=conn_mysql)
23     conn2=Thread(target=conn_mysql)
24     check=Thread(target=check_mysql)
25 
26     conn1.start()
27     conn2.start()
28     check.start()

示例三:

 1 import threading
 2 import time
 3 class boss(threading.Thread):
 4     def run(self):
 5         event_lock.set()
 6         print("今天要加班")
 7         event_lock.clear()
 8         time.sleep(2)
 9         event_lock.set()
10         print("22:00下班")
11 class worker(threading.Thread):
12     def run(self):
13         event_lock.wait()
14         print('命苦啊.....')
15         time.sleep(1)
16         event_lock.wait()
17         print('耶....')
18 event_lock=threading.Event()
19 threads = []
20 for i in range(5):
21     t = worker()
22     t.start()
23     threads.append(t)
24 b = boss()
25 b.start()

3 定时器

定时器,指定n秒后执行某操作

定时生成验证码的示例:

 1 import random
 2 class code:
 3     def __init__(self):
 4         self.make_cache()
 5 
 6         def make_cache(self, interval=5):
 7         self.cache = self.make_code()
 8         print('
',self.cache)
 9         self.t = Timer(interval,self.make_cache)
10         self.t.start()
11     def make_code(self, n=4):
12         res = ''
13         for i in range(n):
14             s1 =str(random.randint(0,9))
15             s2 =chr(random.randint(65,90))
16             res+=random.choice([s1,s2])
17         return res
18 
19     def check(self):
20         while True:
21             code = input('输入验证码:')
22             if code.upper() == self.cache:
23                 print('验证码输入错误')
24                 self.t.cancel()
25                 break
26                obj = code()
27                obj.check()    

九 线程queue

有三种不同的用法

1 class queue.Queue(maxsize=0) #队列:先进先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

'''
结果(先进先出):
first
second
third
'''

2 class queue.LifoQueue(maxsize=0) #堆栈:last in fisrt out

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

'''
结果(后进先出):
third
second
first
'''

3 class queue.PriorityQueue(maxsize=0) #优先级队列:存储数据时可设置优先级的队列

import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())


'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

原文地址:https://www.cnblogs.com/harryblog/p/9126046.html