python3第十天

线程:
一套流水线的运行过程,操作系统是个工厂,里面有有多个车间(进程),线程就是每个车间生产的流水线,每个进程,默认都会有个线程
进程是资源单元,线程是cpu上的调度单位
线程的创建开销小,共享空间
进程直接是竞争关系,线程之间是协作关系
 
 
一、线程的模块threading模块
方式一
from threading import Thread
from multiprocessing import Process
 
def task():
    print('is running')
 
if __name__ == '__main__':
    t=Thread(target=task,)#线程
    # t=Process(target=task,)#进程
    t.start()
    print('主’)
 
线程的结果:
is running
 
进程:
is running
进程开销大
线程开销小
 
方式二
from threading import Thread
from multiprocessing import Process
class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s is running' %self.name)
 
if __name__ == '__main__':
    t=MyThread('egon')
    # t=Process(target=task,)
    t.start()
    print('主’)
 
 
二、pid:
多线程的pid是一样的
多进程 pid不一样
 
三、多线程共享同一个进程内的资源:
from threading import Thread
from multiprocessing import Process
n=100
def work():
    global n
    n=0
 
if __name__ == '__main__':
 
    # p=Process(target=work,)
    # p.start()
    # p.join()
    # print('主',n)
 
    t=Thread(target=work,)
    t.start()
    t.join()
    print('主',n)
线程的n会变,进程不会。
因为线程共享资源,进程不是,而且进程pid不一样。
 
 
四、多线程共享同一进程内存地址空间:
from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        msg_l.append(msg)
 
def format():
    while True:
        if msg_l:
            data=msg_l.pop()
            format_l.append(data.upper())
 
def save():
    while True:
        if format_l:
            data=format_l.pop()
            with open('db.txt','a') as f:
                f.write('%s ' %data)
 
if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format)
    t3=Thread(target=save)
 
    t1.start()
    t2.start()
    t3.start()
 
 
Threading其他方法:
#current_thread:当前的线程对象
from threading import Thread,activeCount,enumerate,current_thread
import time
def task():
    print('%s is running' %current_thread().getName())
    time.sleep(2)
 
if __name__ == '__main__':
    t=Thread(target=task,)
    t.start()
    t.join()#等待task运行结束(系统回收完)
    print(t.is_alive())#返回线程是否活动
    print(t.getName())#返回线程名
    print(enumerate())#当前活跃的进程对象
    print('主')
    print(activeCount())#活着的线程数
 
主线程从执行层面上代表了,其所在进程的执行过程。
 
 
五、守护线程
无论是进程还是线程,都遵循:守护会等待主运行完毕后被销毁
需要强调的是:运行完毕并非终止运行
from threading import Thread
import time
 
def task1():
    print('123')
    time.sleep(10)
    print('123done')
 
def task2():
    print('456')
    time.sleep(1)
    print('456done')
 
if __name__ == '__main__':
    t1=Thread(target=task1)
    t2=Thread(target=task2)
    t1.daemon=True
    t1.start()
    t2.start()
    print('主’)
 
六、GIL

三个需要注意的点:
1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来

2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高

线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock。。。这就导致了串行运行的效果

七、互斥锁
from threading import Thread,Lock
import time
n=100
def work():
time.sleep(0.05)
global n
mutex.acquire()
temp=n
time.sleep(0.1)
n=temp-1
mutex.release()

if __name__ == '__main__':
mutex=Lock()
l=[]
start=time.time()
for i in range(100):
t=Thread(target=work)
l.append(t)
t.start()

for t in l:
t.join()
print('run time:%s value:%s' %(time.time()-start,n))


#多进程:
#优点:可以利用多核优势
#缺点:开销大


#多线程:
#优点:开销小
#缺点:不能利用多核优势

# from threading import Thread
# from multiprocessing import Process
# import time
# #计算密集型
# def work():
# res=1
# for i in range(100000000):
# res+=i
#
# if __name__ == '__main__':
# p_l=[]
# start=time.time()
# for i in range(4):
# # p=Process(target=work) #6.7473859786987305
# p=Thread(target=work) #24.466399431228638
# p_l.append(p)
# p.start()
# for p in p_l:
# p.join()
#
# print(time.time()-start)


from threading import Thread
from multiprocessing import Process
import time
#IO密集型
def work():
time.sleep(2)

if __name__ == '__main__':
p_l=[]
start=time.time()
for i in range(400):
# p=Process(target=work) #12.104692220687866
p=Thread(target=work) #2.038116455078125
p_l.append(p)
p.start()
for p in p_l:
p.join()

print(time.time()-start)

八、死锁与递归锁
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
死锁现象
from threading import Thread,Lock,RLock
import time
mutexA=Lock()
mutexB=Lock()
class Mythread(Thread):
def run(self):
self.f1()
self.f2()

def f1(self):
mutexA.acquire()
print('33[45m%s 抢到A锁33[0m' %self.name)
mutexB.acquire()
print('33[44m%s 抢到B锁33[0m' %self.name)
mutexB.release()
mutexA.release()

def f2(self):
mutexB.acquire()
print('33[44m%s 抢到B锁33[0m' %self.name)
time.sleep(1)
mutexA.acquire()
print('33[45m%s 抢到A锁33[0m' %self.name)
mutexA.release()
mutexB.release()


if __name__ == '__main__':
for i in range(20):
t=Mythread()
t.start()

递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁

#递归锁
from threading import Thread,Lock,RLock
import time
mutex=RLock()
class Mythread(Thread):
def run(self):
self.f1()
self.f2()

def f1(self):
mutex.acquire()
print('33[45m%s 抢到A锁33[0m' %self.name)
mutex.acquire()
print('33[44m%s 抢到B锁33[0m' %self.name)
mutex.release()
mutex.release()

def f2(self):
mutex.acquire()
print('33[44m%s 抢到B锁33[0m' %self.name)
time.sleep(1)
mutex.acquire()
print('33[45m%s 抢到A锁33[0m' %self.name)
mutex.release()
mutex.release()


if __name__ == '__main__':
for i in range(20):
t=Mythread()
t.start()
 
九、信号量
from threading import Thread,current_thread,Semaphore
import time,random

sm=Semaphore(5)
def work():
sm.acquire()
print('%s 上厕所' %current_thread().getName())
time.sleep(random.randint(1,3))
sm.release()

if __name__ == '__main__':
for i in range(20):
t=Thread(target=work)
t.start()

十、进程池,线程池
进程池
import requests #pip3 install requests
import os,time
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
def get_page(url):
print('<%s> get :%s' %(os.getpid(),url))
respone = requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}

def parse_page(obj):
dic=obj.result()
print('<%s> parse :%s' %(os.getpid(),dic['url']))
time.sleep(0.5)
res='url:%s size:%s ' %(dic['url'],len(dic['text'])) #模拟解析网页内容
with open('db.txt','a') as f:
f.write(res)


if __name__ == '__main__':

# p=Pool(4)
p=ProcessPoolExecutor()
urls = [
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
]


for url in urls:
# p.apply_async(get_page,args=(url,),callback=parse_page)
p.submit(get_page,url).add_done_callback(parse_page)

p.shutdown()
print('主进程pid:',os.getpid())


线程池
import requests #pip3 install requests
import os,time,threading
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
def get_page(url):
print('<%s> get :%s' %(threading.current_thread().getName(),url))
respone = requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}

def parse_page(obj):
dic=obj.result()
print('<%s> parse :%s' %(threading.current_thread().getName(),dic['url']))
time.sleep(0.5)
res='url:%s size:%s ' %(dic['url'],len(dic['text'])) #模拟解析网页内容
with open('db.txt','a') as f:
f.write(res)


if __name__ == '__main__':

# p=Pool(4)
p=ThreadPoolExecutor(3)
urls = [
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
]


for url in urls:
# p.apply_async(get_page,args=(url,),callback=parse_page)
p.submit(get_page,url).add_done_callback(parse_page)

p.shutdown()
print('主进程pid:',os.getpid())
 
原文地址:https://www.cnblogs.com/jnbb/p/7497547.html