多线程 vs 多线程
- 程序:一堆代码以文本形式存入一个文档
- 进程:程序运行的一个状态
- 包含地址空间,内存,数据栈等
- 每个进程由自己完全独立的运行环境,多进程共享数据是一个问题
- 线程
- 一个进程的独立运行片段,一个进程可以由多个线程
- 轻量化的进程
- 一个进程的多个线程间共享数据和上下文运行环境
- 共享互斥问题
- 全局解释器锁(GIL)
- python代码的执行是由python虚拟机进行控制
- 在主循环中只能有一个控制线程在执行
- Python包
- thread:有问题,不好用,python3改成了_thread
- threading:同行的包
多线程,缩短总时间
import time
import _thread as thread
def loop1():
print('start loop1 at:', time.ctime())
time.sleep(4)
print('End loop1 at', time.ctime())
def loop2():
print('Start loop2 at', time.ctime())
time.sleep(2)
print('End loop2 at', time.ctime())
def main():
print('Starting at', time.ctime())
# 启动多线程的意思是用多线程去执行某个函数
# 启动多线程函数为start_new_thead
# 参数两个,一个需要运行的函数名,第二是函数的参数作为元组使用,为空则使用空元组
# 注意:如果函数只有一个参数,需要参数后有一个逗号
thread.start_new_thread(loop1,())
thread.start_new_thread(loop2,())
print('All done at', time.ctime())
if __name__ == '__main__':
main()
while True:
time.sleep(1)
threading的使用
- 直接使用threading.Thread生成Thread实例
- t =threading.Thread (target=函数名(无括号), args(参数)=(xxx,))
- t.start():启动多线程
- t.join():等待多线程执行完成
- 案例02
- 如果在程序中将子线程设置成守护线程,则子线程会在主线程结束的时候自动退出
- 一般认为,守护线程中不允许离开主线程独立运行
- 守护线程案例能否有效果跟环境相关
- 案例03
# 案例02
def loop1(in1):
print('start loop1 at:', time.ctime())
print("我是参数", in1)
time.sleep(4)
print('End loop1 at:', time.ctime())
def loop2(in1, in2):
print('Start loop2 at:', time.ctime())
print("我是参数1", in1, "和参数 ", in2)
time.sleep(2)
print('End loop2 at:', time.ctime())
def main():
print('Starting at:', time.ctime())
t1 = threading.Thread(target=loop1, args=('蒋老大',))
t1.start()
t2 = threading.Thread(target=loop2, args=('蒋俊才','王晨熙'))
t2.start()
t1.join()
t2.join()
print('All done at:', time.ctime())
if __name__ == '__main__':
main()
# 一定要有while语句
# 因为启动多线程后本程序就作为主线程存在
# 如果主线程执行完毕,则子线程可能也需要终止
while True:
exit()
# 案例03
def fun():
print('start fun')
time.sleep(2)
print('end fun')
print('Main thread')
t1 = threading.Thread(target=fun, args=())
# 设置守护线程的方法,必须在start之前设置,否则无效
t1.setDaemon(True)
# t1.daemon = True
t1.start()
time.sleep(1)
print('Main thread end')
线程常用属性
- Thread实例对象的方法
- isAlive(): 返回线程是否活动的。
- getName(): 返回线程名。
- setName(): 设置线程名。
- threading模块提供的一些方法:
- threading.currentThread(): 返回当前的线程变量。
- threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
- threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
- 案例04
- 直接继承自threading.Thread
- 直接继承Thread
- 案例05
# 案例04
def loop1():
print('start loop 1 at:', time.ctime())
time.sleep(4)
print('End loop 1 at:', time.ctime())
def loop2():
print('Start loop 2 at', time.ctime())
time.sleep(2)
print('End loop 2 at', time.ctime())
def loop3():
print('Start loop 3 at', time.ctime())
time.sleep(5)
print('End loop3 at', time.ctime())
def main():
print('Starting at:', time.ctime())
t1 = threading.Thread(target=loop1, args=())
# setName是给每一个子线程设置一个名字
t1.setName('THR_1')
#t1.setDaemon(True)
t1.start()
t2 = threading.Thread(target=loop2, args=())
t2.setName('THR_2')
#t2.setDaemon(True)
t2.start()
t3 = threading.Thread(target=loop3, args=())
t3.setName('THR_3')
#t3.setDaemon(True)
t3.start()
# 预期3秒后,thread2已经自动结束
time.sleep(3)
# enumerate 得到正在运行子线程,即子线程1和子线程3
for thr in threading.enumerate():
# getName能够得到线程的名字
print("正在运行的线程名字是: {0}".format(thr.getName()))
print("正在运行的子线程数量为: {0}".format(threading.activeCount()))
print('All done at:', time.ctime())
if __name__ == '__main__':
main()
while True:
exit()
# 工业风案例案例05
import threading
from time import sleep, ctime
loop = [4,2]
class ThreadFunc:
def __init__(self,name):
self.name = name
def loop(self, nloop, nsec):
print('Start loop', nloop, 'at', ctime())
sleep(nsec)
print('Done loop', nloop, 'at', ctime())
def main():
print("Starting at:", ctime())
t = ThreadFunc('loop')
t1 = threading.Thread(target=t.loop, args=('Loop1', 4))
t2 = threading.Thread(target=ThreadFunc('loop').loop, args=('Loop2', 2))
t1.start()
t2.start()
t1.join()
t2.join()
print('all done at:', ctime())
if __name__ == '__main__':
main()
while True:
exit()
共享变量
- 共享变量:当多个线程同时访问一个变量的时候,会产生共享变量的问题
- 解决变量:索,信号灯
- 锁(Lock):
- 是一个标准,表示一个线程在占用一些资源
- 使用方法
- 上锁
- 使用共享资源,放心的用
- 取消锁,释放锁
- 案例06
- 锁谁:哪个资源需要多个线程共享,锁哪个
- 理解锁:锁其实不是锁住谁,而是一个令牌
import threading
import time
sum = 0
loopSum = 1000000
lock = threading.Lock()
def myAdd():
global sum, loopSum
time.sleep(2)
for i in range(1, loopSum+1):
# 上锁,申请锁
lock.acquire()
sum += 1
# 释放锁
lock.release()
def myMinu():
global sum, loopSum
time.sleep(2)
for i in range(1, loopSum+1):
lock.acquire()
sum -= 1
lock.release()
def main():
print('Starting {0}'.format(sum))
t1 = threading.Thread(target=myAdd, args=())
t2 = threading.Thread(target=myMinu, args=())
t1.start()
t2.start()
t1.join()
t2.join()
print("Done {0}".format(sum))
if __name__ == '__main__':
main()
while True:
exit()
线程安全问题
-
如果一个资源/变量,他对于多线程来讲,不用加锁也不会引起任何问题,则称为线程安全
-
线程不安全变量类型:list,set,dict
-
线程安全变量类型:queue
-
生产者消费者问题
- 一个模型,可以用来搭建消息队列
- queue是一个用来存放变量的数据结构,特点是先进先出,内部元素排队,可以理解成一个特殊的list
import threading
import time
from queue import Queue
class Producer(threading.Thread):
def run(self):
global q
count = 0
while count <= 10:
# qsize返回queue的内容长度
if q.qsize() < 10:
for i in range(5):
count += 1
msg = self.name + '生成产品' + str(count)
q.put(msg)
print(msg)
time.sleep(0.5)
class Consumer(threading.Thread):
def run(self):
global q
while True:
if q.qsize() > 4:
for i in range(5):
msg = self.name + '消费了' + q.get()
print(msg)
time.sleep(1)
if __name__ == '__main__':
q = Queue()
#for i in range(5):
#q.put('初始产品'+str(i))
for i in range(2): #线程数量为2
p = Producer()
p.start()
for i in range(2):
c = Consumer()
c.start()
while True:
exit()
- 死锁问题
- 锁的等待时间问题
import threading
import time
lock_1 = threading.Lock()
lock_2 = threading.Lock()
def func_1():
print('func_1 starting……')
lock_1.acquire(timeout=4)
print('func_1 申请了 lock_1……')
time.sleep(2)
print('func_1 等待 lock_2……')
rst = lock_2.acquire(timeout=2)
if rst:
print('func_1 已经得到锁lock_2')
lock_2.release()
print('func_1 释放了锁 lock_2……')
else:
print('func_1 注定没申请到锁lock_2……')
lock_1.release()
print('func_1 释放了lock_1')
print('func_1 done……')
def func_2():
print('func_2 starting……')
lock_2.acquire()
print('func_2 申请了 lock_2……')
time.sleep(4)
print('func_2 等待 lock_1……')
lock_1.acquire()
print('func_2 申请了 lock_1……')
lock_1.release()
print('func_2 释放了 lock_1……')
lock_2.release()
print('func_2 释放了 lock_2……')
print('func_2 done……')
def main():
print('主程序启动……')
t1 = threading.Thread(target=func_1, args=())
t2 = threading.Thread(target=func_2, args=())
t1.start()
t2.start()
t1.join()
t2.join()
print('主程序结束……')
if __name__ == "__main__":
main()
while True:
exit()
semphore 限制线程数量
- 允许一个资源最多由几个多线程同时使用
- 案例
import threading
import time
# 参数定义最多几个线程同时使用资源
semaphore = threading.Semaphore(3)
def func():
if semaphore.acquire():
for i in range(5): # 一个线程重复5次
print(threading.currentThread().getName() + 'get semaphore')
time.sleep(1)
semaphore.release()
print(threading.currentThread().getName() + 'release semaphore')
for i in range(8): # 八个线程
t1 = threading.Thread(target=func)
t1.start()
threading.Timer
- Timer是利用多线程,在指定时间后启动一个功能
import threading
import time
def func():
print('I am running……')
time.sleep(2)
print('I am done……')
if __name__ == '__main__':
t = threading.Timer(3, func) # 3秒后执行func
t.start()
i = 0
while i <=8 :
print('{0}**********',format(i))
time.sleep(1)
i += 1
可重入锁
- 一个锁,可以被一个线程多次申请
- 主要解决递归调用的时候,需要申请锁的情况
import threading
import time
class MyThread(threading.Thread):
def run(self):
global num
time.sleep(1)
if mutex.acquire(1):
num += 1
msg = self.name + ' set num to ' + str(num)
print(msg)
mutex.acquire()
mutex.release()
mutex.release()
num = 0
mutex = threading.RLock()
def test():
for i in range(5):
t = MyThread()
t.start()
if __name__ == "__main__":
test()
线程替代方案
- subprocess
- 完全跳过线程,使用进程
- 是派生进程的主要替代方案
- multiprocessing
- 使用threading接口派生,使用子进程
- 允许为多核或者多cpu派生进程,接口跟threading非常相似
- concurrent.futures
- 新的异步执行模块
- 任务级别的操作
- 3.2后引入
多进程
- 进程间通讯(InterprocessCommunication,IPC)
- 进程之间无任何共享状态
- 进程的创建
- 直接生成Process实例对象,案例19
- 派生子类,案例20
import multiprocessing
from time import sleep, ctime
# 案例 19
def clock(interval):
while True:
print('The time is %s' % ctime())
sleep(interval)
if __name__ == '__ main__':
p = multiprocessing.Process(target = clock, args = (5,))
p.start()
# 案例 20
class ClockProcess(multiprocessing.Process):
def __init__(self,interval):
super().__init__()
self.interval = interval
def run(self):
while True:
print('The time is %s' % ctime())
sleep(self.interval)
if __name__ == '__main__':
p = ClockProcess(3)
p.start()
- 在os中查看pid.ppid以及他们的关系
- 案例 21
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())#得到父进程的ppid
print('process id:', os.getpid()) # 得到本身进程Id
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
- 生产者消费者模型
- JoinableQueue
- 案例22
import multiprocessing
from time import ctime
def consumer(input_q):
print('Info consumer:', ctime())
while True:
item = input_q.get()
if item is None: # 哨兵
break
print('pull', item, 'out of q') #此处替换为有用的工作
#input_q.task_done()
#发出信号通知任务完成
print('out of consumer:', ctime())
# 此句执行完成,再转入主进程
def producer(sequence, output_q):
print('Into producer:', ctime())
for item in sequence:
output_q.put(item)
print('put', item, 'into q')
print('out of producer:', ctime())
# 建立进程
if __name__ == '__main__':
q = multiprocessing.JoinableQueue()
# 运行消费者进程
# 多进程
cons_p1 = multiprocessing.Process(target = consumer, args = (q,))
cons_p1.start()
cons_p2 = multiprocessing.Process(target = consumer, args = (q,))
cons_p2.start()
# 生产多个项,sequence代表要发送给消费者的项序列
# 在实践中,这可能是生成器的输出或通过一些其他方式生产出来
sequence = [1,2,3,4]
producer(sequence, q)
# 等待所有项被处理
q.put(None)
q.put(None)
cons_p1.join()
cons_p2.join()
#q.join()