Day10 多线程理论 开启线程

多线程:

多线程和多进程的不同是他们占用的资源不一样,

一个进程里边可以包含一个或多个进程,

进程的开销大,线程的开销小。

打个比方来说:创建一个进程,就是创建一个车间。创建一个线程,就是在一个车间创建一个流水线。


怎么去开启一个线程:

方法一(直接用默认的类):

 1 开启线程的方式一:使用替换threading模块提供的Thread
 2 from threading import Thread
 3 from multiprocessing import Process
 4 
 5 def task():
 6     print('is running')
 7 
 8 if __name__ == '__main__':
 9     t=Thread(target=task,)
10     # t=Process(target=task,)
11     t.start()
12     print('')

方法二(自己定义一个类,继承系统的类):

 1 #开启线程的方式二:自定义类,继承Thread
 2 from threading import Thread
 3 from multiprocessing import Process
 4 class MyThread(Thread):
 5     def __init__(self,name):
 6         super().__init__()    #不破坏原有的类
 7         self.name=name
 8     def run(self):
 9         print('%s is running' %self.name)
10 
11 if __name__ == '__main__':
12     t=MyThread('egon')
13     # t=Process(target=task,)
14     t.start()
15     print('')

在同时开始多个线程和多个进程的时候,多个线程的pid是不一样的,多个线程的pid是一样的。

可以利用这段代码测试下:

from threading import Thread
from multiprocessing import Process
import os

def task():
    print('%s is running' %os.getpid())

if __name__ == '__main__':
    # t1=Thread(target=task,)
    # t2=Thread(target=task,)
    t1=Process(target=task,)
    t2=Process(target=task,)
    t1.start()
    t2.start()
    print('',os.getpid())

多个线程是共享同一个进程内的资源的。


Thread的其他相关的属性或者方法:

from threading import Thread,activeCount,enumerate,current_thread
import time
def task():
    print('%s is running' %current_thread().getName())
    time.sleep(2)

if __name__ == '__main__':
    t=Thread(target=task,)
    t.start()
    t.join()
    print(t.is_alive())     #判断线程是否是活着的     
    print(t.getName())    #获取到线程名
    print(enumerate())    #显示当前活跃的进程对象
    print('主')
    print(activeCount())   #查看活着的线程数,一般是一个主进程(相当于也是一个线程)和开启的数量

  

current_thread的用法:

current_thread的用法
from threading import Thread,activeCount,enumerate,current_thread
from multiprocessing import Process
import time

def task():
    print('%s is running' %current_thread().getName())
    time.sleep(2)

if __name__ == '__main__':
    p=Process(target=task)
    p.start()
    print(current_thread())

主线程从执行层面上代表了其所在进程的执行过程。


守护线程:

无论是进程还是线程,都是遵循:守护XXX会等待主XXX运行完毕后被销毁。

需要强调的是:运行完毕并非终止运行。

主进程运行完了还需要等待子进程运行。

先看:守护进程

from multiprocessing import Process
import time

def task1():
    print('123')
    time.sleep(1)
    print('123done')

def task2():
    print('456')
    time.sleep(10)
    print('456done')

if __name__ == '__main__':
    p1=Process(target=task1)
    p2=Process(target=task2)
    p1.daemon = True           #p1加了守护进程
    p1.start()
    p2.start()
    print('主')

p1加了守护进程后,在主进程运行结束后,p1的进程也死掉。

p2没有加守护进程,在主进程运行结束后,p2会继续运行。

#再看:守护线程

from threading import Thread
import time

def task1():
    print('123')
    time.sleep(10)
    print('123done')

def task2():
    print('456')
    time.sleep(1)
    print('456done')

if __name__ == '__main__':
    t1=Thread(target=task1)
    t2=Thread(target=task2)
    t1.daemon=True
    t1.start()
    t2.start()
    print('主')

主线程会等其他线程(相当于子线程)运行完毕,程序才会结束。

如果(子)线程加上了守护线程,当主线程运行结束后,(子)线程没有运行结束的都会被干掉。

需要特别注意的一点,这里如果还有其他线程没有运行完,主线程是不会结束的。


GIL全局解释器锁:

GIL本质就是一把互斥锁。

这把锁的意义在于在代码执行的时候,防止多段代码抢占资源。

在Cpython解释器中,同一个进程下开启多个多线程,同一时刻只能有一个线程执行,无法利用多核优势。

from threading import Thread
n=100
def task():
    print('is running')

if __name__ == '__main__':
    t1=Thread(target=task,)
    t2=Thread(target=task,)
    t3=Thread(target=task,)
    # t=Process(target=task,)
    t1.start()
    t2.start()
    t3.start()
    print('')

比如说python执行这样一个程序:

python解释器除了自己开的线程外,还有一些其他的线程,比如说内存回收机制。

线程1,2,3,4都是同一段代码,这三段代码都是要交给python解释器来执行的。

当python代码和内存回收机制同时进行的时候,可能会将数据紊乱。

 所以为了防止这种情况:

可以在解释器的代码前后加入这些代码:

GIL.acquire()
解释器的代码()
GIL.release()

 

python多线程利用不了多核优势,多进程能够利用多核优势。

 1 #线程的互斥锁
 2 from threading import Thread,Lock
 3 import time
 4 n=100
 5 def work():
 6     global n
 7     mutex.acquire()    #加GIL锁
 8     temp=n
 9     time.sleep(0.1)
10     n=temp-1
11     mutex.release()    #解GIL锁
12 
13 if __name__ == '__main__':
14     mutex=Lock()      #造出这把所
15     l=[]
16     start=time.time()
17     for i in range(100):
18         t=Thread(target=work)
19         l.append(t)
20         t.start()
21 
22     for t in l:
23         t.join()
24     print('run time:%s value:%s' %(time.time()-start,n))

互斥锁和join的区别:

上面的互斥锁的代码,下面是join的代码:

from threading import Thread,Lock
import time
n=100
def work():
    time.sleep(0.05)
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1


if __name__ == '__main__':
    start=time.time()
    for i in range(100):
        t=Thread(target=work)
        t.start()
        t.join()

    print('run time:%s value:%s' %(time.time()-start,n))

join和互斥锁的代码的区别是:

join是将每一个循环斗都执行了,相当于所有的代码串行了。

而互斥锁是只将加锁的代码串行,其他的代码串行。

总结一个结论:如果碰到的是纯IO的操作,利用不了多核优势,一般利用多线程。

如果碰到的是计算的操作,能够利用多核的优势,一般利用多进程

#多线程
优点:开销小 缺点:不能利用多核优势 from threading import Thread from multiprocessing import Process import time #计算密集型 def work(): res=1 for i in range(100000000): res+=i if __name__ == '__main__': p_l=[] start=time.time() for i in range(4): # p=Process(target=work) #6.7473859786987305 p=Thread(target=work) #24.466399431228638 p_l.append(p) p.start() for p in p_l: p.join() print(time.time()-start) #从运算结果上得到结果可以看到,在有4核的机器上,纯计算的多线程程序能利用多核优势,多线程无法利用多核优势。
from threading import Thread
from multiprocessing import Process
import time
#IO密集型
def work():
    time.sleep(2)

if __name__ == '__main__':
    p_l=[]
    start=time.time()
    for i in range(400):
        # p=Process(target=work) #12.104692220687866
        p=Thread(target=work) #2.038116455078125
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()

    print(time.time()-start)


#IO密集型程序能够利用单核优势,开线程比开进程的优势在于线程的资源比进程的资源小。

死锁与递归锁:

#死锁现象
#一个线程起来以后,运行函数F1内容(去抢A锁,然后去抢B锁,然后释放B锁,释放A锁),然后运行F2的程序(先抢B锁,然会抢A锁,等待一小下,然后释放A锁,释放B锁) #同时起了20个线程,当一个线程起来后,运行了A过程,其他线程抢不到A锁,当释放了A锁后,运行B过程,这时候抢到了B锁。其他线程抢到了A锁,第一个线程需要A锁,其他进程需要B锁,就会造成死锁现象。
from threading import Thread,Lock,RLock import time mutexA=Lock() mutexB=Lock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('33[45m%s 抢到A锁33[0m' %self.name) mutexB.acquire() print('33[44m%s 抢到B锁33[0m' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('33[44m%s 抢到B锁33[0m' %self.name) time.sleep(1) mutexA.acquire() print('33[45m%s 抢到A锁33[0m' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(20): t=Mythread() t.start()

解决死锁的问题可以利用递归锁:

#递归锁   RLock
#RLock的运行原理:当锁一旦acquire的话,计数器+1,当锁release的话,计数器就减一。
#其他锁想抢占锁的时候,就必须得等到锁的计数器减到0才能抢占,这样就避免了死锁现象的发生。2017-09-06 from threading import Thread,Lock,RLock import time mutex=RLock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutex.acquire() print('33[45m%s 抢到A锁33[0m' %self.name) mutex.acquire() print('33[44m%s 抢到B锁33[0m' %self.name) mutex.release() mutex.release() def f2(self): mutex.acquire() print('33[44m%s 抢到B锁33[0m' %self.name) time.sleep(1) mutex.acquire() print('33[45m%s 抢到A锁33[0m' %self.name) mutex.release() mutex.release() if __name__ == '__main__': for i in range(20): t=Mythread() t.start()

信号量:

(本质也是一把锁,就好像是一个公共厕所,可以进多个人)

可以让有数量的进程一起运行。

from threading import Thread,current_thread,Semaphore
import time,random
#current_thread    得到线程的名称
#Semaphore    指定线程的最大运行数量函数


#定义最多允许5个进程同时运行
sm=Semaphore(5)
def work():               
    sm.acquire()
    print('%s 上厕所' %current_thread().getName())
    time.sleep(random.randint(1,3))
    sm.release()

if __name__ == '__main__':
    for i in range(20):
        t=Thread(target=work)
        t.start()

信号量的执行是你先创造几把锁,这样就定义了同时能够运行的最大数量。

当有一个线程(或进程)运行了以后,拿走一个锁,count+1.当线程(或线程)运行完毕以后,count-1.

这时候其他的进程或者线程才能够再去抢这把锁。


事件Event:

一共两个进程(或者进程),第二个进程的执行需要得到第一个的执行结果才能执行。

例如说连接数据库需要两个进程,一个检测数据库是否可以连接,一个连接。from threading import Thread,current_thread,Evenimport timeevent=Event()


def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise ConnectionError('链接失败') print('%s 等待第%s次链接mysql' %(current_thread().getName(),count)) event.wait(0.5)
#这里设置的wait是等下面check_mysql的event设置为True count
+=1 print('%s 链接ok' % current_thread().getName()) def check_mysql(): print('%s 正在检查mysql状态' %current_thread().getName()) time.sleep(1)
#这里是将event设置为True
event.set() if __name__ == '__main__': t1=Thread(target=conn_mysql) t2=Thread(target=conn_mysql) check=Thread(target=check_mysql) t1.start() t2.start() check.start()

定时器:

from threading import Timer

def hello(n):
    print("hello, world",n)

t = Timer(3, hello,args=(11,))
t.start()  # after 1 seconds, "hello, world" will be printed
#定义了以后在几秒以后执行

线程queue:

针对于线程,不针对于进程

import queue
#这个是针对于线程的

# q=queue.Queue(3) #队列:先进先出 # q.put(1) # q.put(2) # q.put(3) # # print(q.get()) # print(q.get()) # print(q.get()) # q=queue.LifoQueue(3) #堆栈:后进先出 # q.put(1) # q.put(2) # q.put(3) # # print(q.get()) # print(q.get()) # print(q.get()) q=queue.PriorityQueue(3) #数字越小优先级越高
#前面放的是优先级,数字越小优先级越高
q.put((10,'data1')) q.put((11,'data2')) q.put((9,'data3')) print(q.get()) print(q.get()) print(q.get())

进程池与线程池:

这个模块提高了一个更高级的接口来异步调用。

异步调用就是将一堆任务直接丢到进程池或者线程池,不用等待执行完成。

同步调用是将任务丢到进程池或者线程池,还要等待他的执行结果。

线程池:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

#引用线程池和进程池函数
from threading import current_thread import os,time,random def work(n): print('%s is running' %current_thread().getName()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': p=ThreadPoolExecutor()
#生成一个线程池 objs
=[] for i in range(21): obj=p.submit(work,i)
#将所有线程提交到线程池 objs.append(obj) p.shutdown()
#等待所有线程结束

#打印线程池内的线程
for obj in objs: print(obj.result())

进程池:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import os,time,random
def work(n):
    print('%s is running' %current_thread().getName())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':
    p=ThreadPoolExecutor()
    objs=[]
    for i in range(21):
        obj=p.submit(work,i)
        objs.append(obj)
    p.shutdown()
    for obj in objs:
        print(obj.result())

模拟IO密集型操作,看看是进程池的程序好还是线程池的程序好:

模拟下载网页的过程:(IO需要等待下载)

进程池:

 1 进程池
 2 import requests #pip3 install requests
 3 import os,time
 4 from multiprocessing import Pool
 5 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 6 def get_page(url):
 7     print('<%s> get :%s' %(os.getpid(),url))
 8     respone = requests.get(url)
 9     if respone.status_code == 200:
10         return {'url':url,'text':respone.text}
11 
12 def parse_page(obj):
13     dic=obj.result()
14     print('<%s> parse :%s' %(os.getpid(),dic['url']))
15     time.sleep(0.5)
16     res='url:%s size:%s
' %(dic['url'],len(dic['text'])) #模拟解析网页内容
17     with open('db.txt','a') as f:
18         f.write(res)
19 
20 
21 if __name__ == '__main__':
22 
23     # p=Pool(4)
24     p=ProcessPoolExecutor()
25     urls = [
26         'http://www.baidu.com',
27         'http://www.baidu.com',
28         'http://www.baidu.com',
29         'http://www.baidu.com',
30         'http://www.baidu.com',
31         'http://www.baidu.com',
32         'http://www.baidu.com',
33     ]
34 
35 
36     for url in urls:
37         # p.apply_async(get_page,args=(url,),callback=parse_page)
38         p.submit(get_page,url).add_done_callback(parse_page)
39 
40     p.shutdown()
41     print('主进程pid:',os.getpid())

线程池:

 1 线程池
 2 import requests #pip3 install requests
 3 import os,time,threading
 4 from multiprocessing import Pool
 5 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 6 def get_page(url):
 7     print('<%s> get :%s' %(threading.current_thread().getName(),url))
 8     respone = requests.get(url)
 9     if respone.status_code == 200:
10         return {'url':url,'text':respone.text}
11 
12 def parse_page(obj):
13     dic=obj.result()
14     print('<%s> parse :%s' %(threading.current_thread().getName(),dic['url']))
15     time.sleep(0.5)
16     res='url:%s size:%s
' %(dic['url'],len(dic['text'])) #模拟解析网页内容
17     with open('db.txt','a') as f:
18         f.write(res)
19 
20 
21 if __name__ == '__main__':
22 
23     # p=Pool(4)
24     p=ThreadPoolExecutor(3)
25     urls = [
26         'http://www.baidu.com',
27         'http://www.baidu.com',
28         'http://www.baidu.com',
29         'http://www.baidu.com',
30         'http://www.baidu.com',
31         'http://www.baidu.com',
32         'http://www.baidu.com',
33     ]
34 
35 
36     for url in urls:
37         # p.apply_async(get_page,args=(url,),callback=parse_page)
38         p.submit(get_page,url).add_done_callback(parse_page)
39 
40     p.shutdown()
41     print('主进程pid:',os.getpid())
View Code
原文地址:https://www.cnblogs.com/sexiaoshuai/p/7472687.html