进程是资源分配的最小单位, 线程是CPU调度的最小单位. 每一个进程中至少有一个线程。
线程与进程的区别
1)地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
1)轻型实体
线程中的实体基本上不拥有系统资源,只是有一点必不可少的、能保证独立运行的资源。
线程的实体包括程序、数据和TCB。线程是动态概念,它的动态特性由线程控制块TCB(Thread Control Block)描述。
2)独立调度和分派的基本单位。
在多线程OS中,线程是能独立运行的基本单位,因而也是独立调度和分派的基本单位。由于线程很“轻”,故线程的切换非常迅速且开销小(在同一进程中的)。
3)共享进程资源。
线程在同一进程中的各个线程,都可以共享该进程所拥有的资源,这首先表现在:所有线程都具有相同的进程id,这意味着,线程可以访问该进程的每一个内存资源;此外,还可以访问进程所拥有的已打开文件、定时器、信号量机构等。由于同一个进程内的线程共享内存和文件,所以线程之间互相通信不必调用内核。
4)可并发执行。
在一个进程中的多个线程之间,可以并发执行,甚至允许在一个进程中所有线程都能并发执行;同样,不同进程中的线程也能并发执行,充分利用和发挥了处理机与外围设备并行工作的能力。
同一进程的线程共享进程数据资源,线程内部有自己的数据栈和寄存器,数据不共享
GIL 全局解释器锁,同一时刻,只能有一个线程访问CPU,锁的是线程,原因是Cpython解释器在解释代码的过程中容易产生数据不安全问题。
而python的多线程到底有没有用,我们需要看任务是I/O密集型,还是计算密集型:
如果是I/O密集型任务,有再多核也没用,即能开再多进程也没用,所以我们利用python的多线程一点问题也没有;
如果是计算密集型任务,我们就直接使用多进程就可以了
线程的创建
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主线程')
from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('主线程')
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主线程/主进程pid',os.getpid()) #part2:开多个进程,每个进程都有不同的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主线程/主进程pid',os.getpid())
from threading import Thread from multiprocessing import Process import os def work(): global n n=0 if __name__ == '__main__': # n=100 # p=Process(target=work) # p.start() # p.join() # print('主',n) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100 n=1 t=Thread(target=work) t.start() t.join() print('主',n) #查看结果为0,因为同一进程内的线程之间共享进程内的数据 同一进程内的线程共享该进程的数据
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() t.join() print('主线程') print(t.is_alive())
守护线程
#1.对主进程来说,运行完毕指的是主进程代码运行完毕
#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
#1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
#2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #必须在t.start()之前设置 t.start() print('主线程') print(t.is_alive()) ''' 主线程 True '''
from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------")
GIL锁,全局解释器锁,保证同一进程中只有一个线程被cpu调度,保证了数据的安全性
from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #结果可能为99
from threading import Thread,Lock import os,time def work(lock): lock.acquire() global n time.sleep(0.1) n=n-1 print('打印work') lock.release() if __name__ == '__main__': n=100 l=[] for i in range(100): lock = Lock() p=Thread(target=work,args=(lock,)) l.append(p) p.start() print('执行for循环') for p in l: p.join() print(n)
死锁与递归锁
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
from threading import Lock as Lock import time mutexA=Lock() print(123) mutexA.acquire() print(456) mutexA.acquire() print(789) mutexA.release() mutexA.release()
解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
from threading import RLock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s 抢到了面条'%name) fork_lock.acquire() print('%s 抢到了叉子'%name) print('%s 吃面'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 抢到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 抢到了面条' % name) print('%s 吃面' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','egon','yuan']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
Lock在线程较多,循环较多,且需要判断数据是否满足执行条件的时候(比如Lock版的生产者-消费者模型),会进行反复的上锁与解锁,这样是非常消耗CPU资源的
为了解决这个问题,我们需要引入一个继承自Lock的新方法:Condition
ct = threading.condition() 上锁:ct.acquire() 解锁:ct.release() ct.wait(self,timeout=None):将当前线程处于等待(即阻塞)状态并释放锁。等待状态中的线程可以被其他线程使用notify函数或notify_all函数唤醒,被唤醒后,该线程会继续等待上锁,上锁后继续执行下面的代码 notify(self,n=1):唤醒某一指定线程,默认唤醒等待中的第一个线程 notify_all(self):唤醒所有等待中的线程
# coding=utf-8 import threading import time con = threading.Condition() num = 0 # 生产者 class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): # 锁定线程 global num con.acquire() while True: print "开始添加!!!" num += 1 print "火锅里面鱼丸个数:%s" % str(num) time.sleep(1) if num >= 5: print "火锅里面里面鱼丸数量已经到达5个,无法添加了!" # 唤醒等待的线程 con.notify() # 唤醒小伙伴开吃啦 # 等待通知 con.wait() # 释放锁 con.release() # 消费者 class Consumers(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): con.acquire() global num while True: print "开始吃啦!!!" num -= 1 print "火锅里面剩余鱼丸数量:%s" %str(num) time.sleep(2) if num <= 0: print "锅底没货了,赶紧加鱼丸吧!" con.notify() # 唤醒其它线程 # 等待通知 con.wait() con.release() p = Producer() c = Consumers() p.start() c.start()
import threading import random from time import sleep ct = threading.Condition() all_money = 1000 # 基础金钱1000元 count = 10 # 限制生产者只可以生产十次 class producers(threading.Thread): '''生产者模式''' def run(self): global all_money global count while True: ct.acquire() # 处理数据前,先上锁 if count > 0: # 如果生产次数小于十次 money = random.randint(200,1000) # 随机生产200-1000元 all_money += money # 总金钱数 = 原总金钱数+生产金钱数 count -= 1 # 允许生产次数-1 print('生产者%s生产了%d元,剩余金钱%d元' % (threading.current_thread(), money, all_money)) else: # 如果生产次数已满10次 ct.release() # 解锁 break # 生产结束,跳出循环 ct.notify_all() # 通知所有等待中的消费者,生产已完成,可以开始消费 ct.release() # 解锁 sleep(0.5) class comsumer(threading.Thread): '''消费者模式''' def run(self): global all_money global count while True: ct.acquire() # 处理数据前,先上锁 money = random.randint(200,1000) # 随机消费200-1000元 # 下面这个while是重点!(敲黑板,记笔记,后面我会说到的) while money > all_money: # 如果需消费金额大于总金额,则等待至总金额大于需消费金钱 if count == 0: # 如果生产者生产次数已达上限 ct.release() # 结束前解锁 return # 结束函数 print('消费者%s需要消费%d元,剩余金钱%d元,不足' % (threading.current_thread(), money, all_money)) ct.wait() # 进入等待(阻塞进程) all_money -= money # 剩余金额大于消费金额时,总金额 = 原总金额 - 消费金额 print('消费者%s消费了%d元,剩余金钱%d元' % (threading.current_thread(), money, all_money)) ct.release() # 解锁 sleep(0.5) if __name__ == '__main__': for i in range(3): th = comsumer(name='线程%d'%i) th.start() for i in range(5): th = producers(name='线程%d'%i
Python标准模块--concurrent.futures
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=5) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
#1 介绍 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 异步提交任务 #map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 #shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 #result(timeout=None) 取得结果 #add_done_callback(fn) 回调函数
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s] ' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
threading.Timer,定时器
import time from threading import Timer def fun(): print('时间同步') Timer(10,fun).start()#10秒之哦户运行函数 # while True: # t = Timer(10,fun).start() # 10秒之哦户运行函数 # time.sleep(5) #每5秒运行一次
线程信号量
同进程的一样
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()
import threading,time from threading import Semaphore,Thread def func(sm): sm.acquire() print('%s get sm' %threading.current_thread().getName()) time.sleep(2) sm.release() if __name__ == '__main__': sm=Semaphore(3) for i in range(30): t=Thread(target=func,args=(sm,)) t.start()