20190102(多线程,守护线程,线程互斥锁,信号量,JoinableQueue)

多线程

多进程: 核心是多道技术,本质上就是切换加保存技术。 当进程IO操作较多,可以提高程序效率。 每个进程都默认有一条主线程。

多线程: 程序的执行线路,相当于一条流水线,其包含了程序的具体执行步骤。 操作系统是工厂,进程就是车间,线程就是流水线。 同一个进程的线程PID相同

线程和进程的关系: 进程包含了运行程序的所有资源,同一进程内的线程们共享该资源。不同进程内的线程资源是隔离的。 进程是一个资源单位,线程是CPU的最小执行单位! 每一个进程一旦被创建,就默认开启了一条线程,该线程称之为主线程。 一个进程可以包含多个线程。进程包含线程,线程依赖进程。

为什么使用线程: 提高程序效率,进程对操作系统的资源占用较高。 线程是如何提高效率的: 默认情况下,每个进程有且只有一条主线程,执行代码时如果遇到IO,系统就会切换到其他应用程序(其实是切换线程),这会降低当前应用程序的效率,CPU切换是在不同线程之间进行切换,多线程可以使CPU在一个进程内切换,从而提高程序对CPU的占用率。

什么时候启用多线程: 当程序遇到IO时 但如果程序是纯计算时,使用多线程无法提高效率。

进程和线程的区别: 进程对于操作系统的资源占用非常高,而线程比较低(是进程的十分之一到百分之一) 同一个进程内的线程共享该进程的资源

 

如何使用多线程: 开启线程的两种方式: 1、实例化Thread类 2、继承Thread类,覆盖run方法

# 实例化Thread
from threading import Thread

def task():
   print("threading run")
t1 = Thread(target=task)  # 形式与进程类似
t1.start()  # 手动启动线程,不用像进程那样放到main下
print("over") # 子线程可能更快,也可能慢

# 继承Thread类,覆盖run方法
class MyThread(Thread):
   def run(self):
       print("子线程 run")

MyThread().start()
print("over")

 

线程与进程的资源占用对比:

from multiprocessing import Process
from threading import Thread
import time


def task():
   print("子进程run")

if __name__ == '__main__':
   start = time.time()
   ps = []
   for i in range(100):
       p = Process(target=task)
       p.start()
       ps.append(p)
   for p in ps:
       p.join()  # 这一步是为了让所有子进程都执行完毕后在执行主进程,进而得到准确的运行时间
   print(time.time()-start)  #我的电脑用了74s

start = time.time()
ts = []
for i in range(100):
   t = Thread(target=task)
   t.start()
   ts.append(t)
for t in ts:
   t.join()  # 子线程都结束后再执行主线程
print(time.time() - start)  # 0.05s 效率提升了上千倍

同一进程内的线程资源共享

from threading import Thread
import time

x = 100
def task():
   global x
   x = 0
t = Thread(target=task)
t.start()
time.sleep(0.1) # 子线程先运行
print(x)  # 输出为0

from threading import Thread
import time

x = 100
def task():
   global x
   time.sleep(0.1) # 主线程先运行
   x = 0
t = Thread(target=task)
t.start()
print(x)  # 输出为100

 

守护线程

守护线程:在所有非守护线程结束后结束的线程。 一个程序中可以有多个守护线程,非守护线程都结束后,多个守护进程一起死。

# 调节下面的两处时间,可以得到不同的结果
# 两处时间都取消时,守护进程可能也会完全打印,主要是因为主线程在打印结束后需要时间去结束(并不是说只要主线程打印结束立马主线程就完全结束,结束也是需要时间的),而线程速度极快,所以可能完全执行。当然,也可能部分执行,这取决于CPU的运行情况。
from threading import Thread
import time

def task():
   print("子线程运行。。。")
   # time.sleep(1)
   print("子线程运行。。。")
t = Thread(target=task)
t.setDaemon(True)  # 要放到子线程开始前,注意进程的守护进程是deamon,线程的守护线程是setDeamon
t.start()
# time.sleep(0.1)
print("over")

 

线程中常用属性

 

from threading import Thread,current_thread,enumerate,active_count

import os

def task():
   print("running...")
   print("子线程",os.getpid())  # 主线程和子线程PID相同
   print(current_thread())  # 获取当前线程对象
   print(active_count())  # 获取正在运行的线程个数

t1 = Thread(target=task)
t1.start()
print("主线程",os.getpid())

print(t1.is_alive())  # t1线程是否还在运行
print(t1.isAlive())   # 同上,只是两种不同写法,官方文档中显示一模一样:isAlive = is_alive
print(t1.getName())  # 获取子线程t1的线程名称(Thread-1)(即第一个子线程)

print(enumerate())   # 获取所有线程对象列表
输出结果:
[<_MainThread(MainThread, started 19516)>, <Thread(Thread-1, started 9072)>]

 

线程互斥锁

当多个线程需要同时修改同一份数据时,可能会造成数据错乱,此时需要互斥锁。(类似于进程互斥锁)

不用线程互斥锁的情况:

a = 100
from threading import Thread
import time
def task():
   global a
   temp = a - 1
   time.sleep(0.01)  # 强行增加子线程运行时间,由于没有互斥锁,主线程的a会错乱
   a = temp  # 故意增加步骤,延长线程运行时间
for i in range(100):
   t = Thread(target=task)
   t.start()

print(a) # 输出结果可能为98或99等等(都有可能)

解决办法,加入线程互斥锁,并且添加join确保子线程全部运行结束:

a = 100
from threading import Thread,Lock
import time
lock = Lock()
def task():
   lock.acquire()  # 与进程互斥锁一致
   global a
   temp = a - 1
   time.sleep(0.01)
   a = temp
   lock.release()   # 拿到一次锁就必须释放一次
ts = []
for i in range(100):
   t = Thread(target=task)
   t.start()
   ts.append(t)
for t in ts:
   t.join()  # 确保所有子进程都结束

# t.join() # 使用此方法不行,因为这样只能等待最后一个子线程,但是无法保证最后一个子线程是最后执行的,如果最后一个子线程却先于一些线程执行完毕,那么还是会出问题。
print(a)  # 结果为0

 

信号量

信号量也是一种锁,特点是可以设置一个数据可以被几个线程(进程)共享。

与Lock的区别:
  Lock锁,数据同时只能被一个线程使用
  信号量,可以指定数据同时被多个线程使用

使用场景:
  限制一个数据被同时访问的次数,保证程序/操作系统的稳定
  比如一台电脑设置共享,限制连接的终端,保证CPU/磁盘不至于过载导致错乱或崩溃

例子:

from threading import Semaphore,Thread,current_thread
import time

sem = Semaphore(3)  # 信号量设定为3,一份数据最多允许三份子线程访问

def task():
   sem.acquire()  # 和锁的用法类似
   print("%s run" %current_thread())
   time.sleep(1)
   sem.release()

for i in range(10):
   t = Thread(target=task)
   t.start()
   
输出结果:(三个三个一输出)
<Thread(Thread-1, started 13944)> run
<Thread(Thread-2, started 27096)> run
<Thread(Thread-3, started 26108)> run
<Thread(Thread-4, started 20580)> run
<Thread(Thread-5, started 20304)> run
<Thread(Thread-6, started 10676)> run
<Thread(Thread-8, started 18256)> run
<Thread(Thread-7, started 22684)> run
<Thread(Thread-9, started 18532)> run
<Thread(Thread-10, started 7036)> run

生产者消费者+守护进程

应用:生产者和消费者通过队列交换数据,都结束后程序再结束。

这里先补充一个其他知识:

进程队列的另一个类JoinableQueue

JoinableQueue同样通过multiprocessing使用。

创建队列的另外一个类:

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

参数介绍:

maxsize是队列中允许最大项数,省略则无大小限制。

方法介绍:

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:

q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常

q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

简言之:(如果还不理解看最下面的官方文档)

task_done()表示队列中的某个任务已经完成了,会返回给join一个信号。 join()会一直阻塞直到队列中的所有任务已经被获取并处理。一旦任务被加入队列,任务计数会加1,一旦收到task_done()的返回信号,任务计数会减1,如果任务计数减为0,则join()解除阻塞,程序也就能继续运行了。

import time,random
from multiprocessing import JoinableQueue,Process


def eat_hotdog(name,q):
   while True:  # 因为是守护进程,所以只要主进程结束,该循环也就结束了。
       res = q.get()
       print("%s 在吃 %s" %(name,res))
       time.sleep(random.randint(1,2))
       q.task_done()  # 记录当前已经被处理的数据的数量,该方法是JoinableQueue下的方法。


def make_hotdog(name,q):
   for i in range(1,5):
       res = "第%s热狗" %i
       print("%s 生产了 %s" % (name, res))
       q.put(res)


if __name__ == '__main__':
   q = JoinableQueue()
   p1 = Process(target=make_hotdog,args=("徐福记热狗店",q))
   p1.start()

   p3 = Process(target=make_hotdog,args=("万达热狗店",q))
   p3.start()

   p2 = Process(target=eat_hotdog,args=("思聪",q))
   p2.daemon = True  # 此处加入守护进程,确保主程序结束后,该进程的while循环也会结束
   p2.start()

# 生产者全部生产完成
   p1.join()
   p3.join()

# 保证队列为空,全部被处理
   q.join()  # 该join方法是JoinableQueue下的方法,并不是上面的那种join方法
   
输出结果:
徐福记热狗店 生产了 第1热狗
徐福记热狗店 生产了 第2热狗
徐福记热狗店 生产了 第3热狗
徐福记热狗店 生产了 第4热狗
万达热狗店 生产了 第1热狗
万达热狗店 生产了 第2热狗
万达热狗店 生产了 第3热狗
万达热狗店 生产了 第4热狗
思聪 在吃 第1热狗
思聪 在吃 第2热狗
思聪 在吃 第3热狗
思聪 在吃 第4热狗
思聪 在吃 第1热狗
思聪 在吃 第2热狗
思聪 在吃 第3热狗
思聪 在吃 第4热狗

官方文档:

JoinableQueue是由Queue派生出来的一个类,与Queue不同,JoinableQueue有task_done()和Join()方法。

task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done()tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.

join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

 

原文地址:https://www.cnblogs.com/realadmin/p/10209564.html