线程

0进程和线程之间的关系:

      进程是负责资源分配的,线程是实际上帮你执行代码的,线程不能独立存在,必须给他分配资源

线程和进程的区别:

        进程
             计算机中最小的资源分配单位
             数据隔离
             进程可以独立独立存在
             创建与销毁 还有切换 都慢 给操作系统压力大
        线程
                       计算机中能被CPU调度的最小单位
                       同一个进程中的多个线程资源共享
                       线程必须依赖进程存在
                       创建与销毁 还有切换 都比进程快很多

  

为什么要有线程

 线程本身创建出来就是为了解决并发问题的,并且它的整体效率比进程要高,但是线程实际上也有一些性能上的限制管理调度开销

在整个程序界:
   
   如果你的程序需要数据隔离 : 多进程
      如果你的程序对并发的要求非常高 : 多线程

socketserver 多线程的
web的框架 django flask tornado 多线程来接收用户并发的请求

在整个编程界:
      同一个进程中的多个线程可以同时使用多个cpu

线程锁这件事儿是有Cpython解释器完成
对于python来说 同一时刻只能有一个线程被cpu访问
彻底的解决了多核环境下的安全问题


线程锁 : 全局解释器锁 GIL:
         1.这个锁是锁线程的
          2.这个锁是解释器提供的

threading模块

1.初识线程模块

      from  threading  import  Thread

           process   进程类

           Thread     线程类

2.开启多线程代码 

 import time
 from threading import Thread

 def func():
     print('start')
     time.sleep(1)
     print('end')

 if __name__ == '__main__':
     t = Thread(target=func)
     t.start()
     for i in range(5):
         print('主线程')
         time.sleep(0.3)

说明线程和进程完全异步 说明线程开启速度更快一些

3.开启线程的另一种方式

from threading import Thread
class Mythread(Thread): def __init__(self,arg): #如果想要传参数 super().__init__() self.arg = arg
def run(self): print('in son',self.arg) t = Mythread(123) t.start()

4.进程与线程的效率对比 

#线程 
import time from threading import Thread def func(): n = 1 + 2 + 3 n ** 2 if __name__ == '__main__': start = time.time() lst = [] for i in range(100): #启多个线程 t = Thread(target=func) t.start() lst.append(t) for t in lst: t.join() print(time.time() - start)

#进程
import time
from multiprocessing import Process as Thread
def func():
     n = 1 + 2 + 3
     n**2

if __name__ == '__main__':
     start = time.time()
     lst = []
     for i in range(100):
         t = Thread(target=func)
         t.start()
         lst.append(t)
     for t in lst:
         t.join()
     print(time.time() - start)

线程效率要比进程快的多

5.线程的数据共享 

 from threading import Thread
 n = 100
 def func():     #子线程
     global n
     n -= 1

 t = Thread(target=func)  #主线程
 t.start()
 t.join()
 print(n)

在子线程减1,在主线程打印结果,仍然是减完之后的结果,意思我的全局变量对于我的子线程和主线程数据是共享的

6.threading模块中的其他功能

第一种方法 

 from threading import Thread,currentThread

 class Mythread(Thread):
     def __init__(self,arg):
         super().__init__()
         self.arg = arg

     def run(self):
         print('in son',self.arg,currentThread())

 
 t = Mythread(123)
 t.start()
print("主:",currentThread())
我的ID不同就说明是在不同的线程里面 

第二种方法

 import time
 from threading import Thread,currentThread,activeCount,enumerate
class Mythread(Thread): def __init__(self,arg): super().__init__() self.arg = arg
def run(self): time.sleep(1) print('in son',self.arg,currentThread())
for i in range(10): t = Mythread(123) t.start() print(t.ident) print(activeCount()) print(enumerate())

7.多线程实现socket_server  

server端

import socket
from threading import Thread
def talk(conn):
    while True:
        msg = conn.recv(1024).decode()
        conn.send(msg.upper().encode())

sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()
while True:
    conn,addr = sk.accept()
    Thread(target=talk,args = (conn,)).start()

client端

import socket
sk = socket.socket()


sk.connect(('127.0.0.1',9000))
while True:
    sk.send(b'hello')
    print(sk.recv(1024)) 

守护线程

守护进程: 
    只守护主进程的代码,主进程代码结束了就结束守护,守护进程在主进程之前结束
守护线程: 
    随着主线程的结束才结束,守护线程是怎么结束的

进程 terminate 强制结束一个进程的
线程 没有强制结束的方法
线程结束 : 线程内部的代码执行完毕 那么就自动结束了
import time
from threading import Thread def func(): while True: print('in func') time.sleep(0.5) def func2(): print('start func2') time.sleep(10) print('end func2') Thread(target=func2).start() t = Thread(target=func) t.setDaemon(True) t.start() print('主线程') time.sleep(2) print('主线程结束')

 锁 

 锁是用来做什么的?

    保证数据的安全的

GIL是干什么的?

    锁线程

有了GIL还要锁干啥?

    有了GIL还是会出现数据不安全的现象,所以还是要用锁

线程也会出现数据不安全的现象:

 import time
 from threading import Thread,Lock
 n = 100
 def func(lock):
     global n
         tmp = n-1  # n-=1
         time.sleep(0.1)
         n = tmp

 if __name__ == '__main__':
     l = []
     for i in range(100):
         t = Thread(target=func)
         t.start()
         l.append(t)
     for t in l:t.join()
     print(n)

  dis模块使用

import dis
n = 1
def func():
    n = 100
    n -= 1

dis.dis(func)

会出现线程不安全的俩种条件:

     1.是全局变量

   2.出现+=  -= 这样的操作    l[0] +=1   d[k] +=1

列表  字典的方法  l.append  l.pop   l.insert   dic.update  都是线程安全的    

 线程加锁:

 import time
 from threading import Thread,Lock
 n = 100
 def func(lock):
     global n
     with lock:
         tmp = n-1  # n-=1
         time.sleep(0.1)
         n = tmp

 if __name__ == '__main__':
     l = []
     lock = Lock()
     for i in range(100):
         t = Thread(target=func,args=(lock,))
         t.start()
         l.append(t)
     for t in l:t.join()
     print(n)

死锁现象 

死锁问题是怎么出现的:

import time
from threading import Thread,Lock
 noodle_lock = Lock()
 fork_lock = Lock()
 def eat1(name):
     noodle_lock.acquire()
     print('%s拿到面条了'%name)
     fork_lock.acquire()
     print('%s拿到叉子了'%name)
     print('%s开始吃面'%name)
     time.sleep(0.2)
     fork_lock.release()
     print('%s放下叉子了' % name)
     noodle_lock.release()
     print('%s放下面了' % name)

 def eat2(name):
     fork_lock.acquire()
     print('%s拿到叉子了' % name)
     noodle_lock.acquire()
     print('%s拿到面条了' % name)
     print('%s开始吃面' % name)
     time.sleep(0.2)
     noodle_lock.release()
     print('%s放下面了' % name)
     fork_lock.release()
     print('%s放下叉子了' % name)

 Thread(target=eat1,args=('alex',)).start()
 Thread(target=eat2,args=('wusir',)).start()
 Thread(target=eat1,args=('太白',)).start()
 Thread(target=eat2,args=('宝元',)).start()

死锁不是时刻发生的,有偶然的情况整个程序崩溃了 

每一个线程之中不止一把锁,并且套着使用容易出现死锁现象

如果某一件事情需要两个资源同时出现,那么不应该将这两个资源通过两把锁控制而应看做一个资源

 解决死锁问题:

import time
from threading import Thread,Lock
 lock = Lock()
 def eat1(name):
     lock.acquire()
     print('%s拿到面条了'%name)
     print('%s拿到叉子了'%name)
     print('%s开始吃面'%name)
     time.sleep(0.2)
     lock.release()
     print('%s放下叉子了' % name)
     print('%s放下面了' % name)

 def eat2(name):
     lock.acquire()
     print('%s拿到叉子了' % name)
     print('%s拿到面条了' % name)
     print('%s开始吃面' % name)
     time.sleep(0.2)
     lock.release()
     print('%s放下面了' % name)
     print('%s放下叉子了' % name)

 Thread(target=eat1,args=('alex',)).start()
 Thread(target=eat2,args=('wusir',)).start()
 Thread(target=eat1,args=('太白',)).start()
 Thread(target=eat2,args=('宝元',)).start()

递归锁

 互斥锁
       无论在相同的线程还是不同的线程,都只能连续acquire一次,要想再acquire,必须先release
 递归锁
       在同一个线程中,可以无限次的acquire
       但是要想在其他线程中也acquire,
       必须现在自己的线程中添加和acquire次数相同的release

from threading import RLock,Lock
1.递归锁 rlock = RLock() # RLock在线程中永远不会被锁住的 rlock.acquire() rlock.acquire() rlock.acquire() rlock.acquire() print('锁不住')
2.互斥锁 lock = Lock() lock.acquire() print('1') lock.acquire() print('2')     #永远打印不出来

递归锁在多个线程中是怎么回事

from threading import RLock,Lock,Thread
 rlock = RLock()
 def func(num):
     rlock.acquire()
     print('aaaa',num)
     rlock.acquire()
     print('bbbb',num)
rlock.release() #俩次release rlock.release() Thread(target=func,args=(1,)).start() Thread(target=func,args=(2,)).start() #阻塞 只有上面执行完代码后经历俩次release把钥匙还回来,才可以执行

用递归锁解决死锁问题:

import time
noodle_lock = fork_lock = RLock()
def eat1(name): noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了'%name) print('%s开始吃面'%name) time.sleep(0.2) fork_lock.release() print('%s放下叉子了' % name) noodle_lock.release() print('%s放下面了' % name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了' % name) print('%s开始吃面' % name) time.sleep(0.2) noodle_lock.release() print('%s放下面了' % name) fork_lock.release() print('%s放下叉子了' % name) Thread(target=eat1,args=('alex',)).start() Thread(target=eat2,args=('wusir',)).start() Thread(target=eat1,args=('太白',)).start() Thread(target=eat2,args=('宝元',)).start()

  信号量

信号量= 锁+计数器 实现的功能

import time
from threading import Semaphore,Thread

 def func(name,sem):
     sem.acquire()
     print(name,'start')
     time.sleep(1)
     print(name,'stop')
     sem.release()

 sem = Semaphore(5)  #信号量给了你五把钥匙,五个线程可以来执行
 for i in range(20):
     Thread(target=func,args=(i,sem)).start()

信号量和池
     进程池
           有1000个任务
           一个进程池中有5个进程
           所有的1000个任务会多次利用这五个进程来完成任务
     信号量
           有1000个任务
           有1000个进程/线程
           所有的1000个任务由于信号量的控制,只能5个5个的执行  

事件

连接数据库:

from threading import Event      #Event 事件 wait() 阻塞 到事件内部标识为True就停止阻塞
import time             #控制标识  set  clear   is_set
import random
from threading import Thread,Event
def connect_sql(e): count = 0 while count < 3: e.wait(0.5) if e.is_set(): print('连接数据库成功') break else: print('数据库未连接成功') count += 1 def test(e): time.sleep(random.randint(0,3)) e.set() e = Event() Thread(target=test,args=(e,)).start() Thread(target=connect_sql,args=(e,)).start()

定时器

定时器,指定n秒后执行某个操作

from threading import Timer

def func():
    print('执行我啦')

t = Timer(3,func)
# 现在这个时间点我不想让它执行,而是预估一下大概多久之后它执行比较合适
t.start()
print('主线程的逻辑')

 条件 

wait      阻塞
notify(n) 给信号
 假如现在有20个线程
 所有的线程都在wait这里阻塞
 notify(n) n传了多少
 那么wait这边就能获得多少个解除阻塞的通知

 

import threading

def run(n):
    con.acquire()
    con.wait()  #阻塞
    print("run the thread: %s" % n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire() 输入几就有几个线程往下执行
        con.notify(int(inp))
        con.release()
        print('****')

 线程队列

queue : 线程队列,线程之间数据安全

q = queue . Queue()

#普通队列

q.get()

q.put()

q.get()

q.get_nowait()   如果有数据我就取,如果没数据不阻塞而是报错

q.put_nowait()

class queue.Queue(maxsize=0) #先进先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''

class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''

class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

 线程池 

1.介绍

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.

2.基本方法

submit(fn, *args, **kwargs)      异步提交任务
map(func, *iterables, timeout=None, chunksize=1)       取代for循环submit的操作
shutdown(wait=True)      相当于进程池的pool.close()+pool.join()操作

wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

result(timeout=None)     取得结果
add_done_callback(fn)   回调函数 

1.获取结果的例子
import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor
def func(num): print('in %s func'%num,currentThread()) time.sleep(random.random()) return num**2 tp = ThreadPoolExecutor(5) 创建五个线程 分别去执行func里的任务 ret_l = [] for i in range(30): ret = tp.submit(func,i) ret_l.append(ret) for ret in ret_l: print(ret.result())

2.shutdown 的特点 相当于close + join
import time
import random
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor

def func(num):
     print('in %s func'%num,currentThread())
     time.sleep(random.random())
     return num**2

  tp = ThreadPoolExecutor(5)
ret_l = []
  for i in range(30):
         ret = tp.submit(func,i)
         ret_l.append(ret)
  tp.shutdown()  # close + join
  for ret in ret_l:
         print(ret.result())

3.进程池和线程池只需要修改一个地方 (线程池和进程池平滑的切换)
import time
import random
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor,processPoolExecutor

def func(num):
     print('in %s func'%num,currentThread())
     time.sleep(random.random())
     return num**2

if__nmae__=="__main__":
  #tp = ThreadPoolExecutor(5) 注释掉改成processPoolExecutor
tp =processPoolExecutor (5) 就变成进程池了不是线程池了
  ret_l = []
  for i in range(30):
         ret = tp.submit(func,i)
         ret_l.append(ret)
  tp.shutdown()  # close + join
  for ret in ret_l:
         print(ret.result())


3.或者另一种引入模块
import time
import random
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor as Pool
# from concurrent.futures import ProcessPoolExecutor as Pool
import os def func(num): # print('in %s func'%num,currentThread()) print('in %s func'%num,os.getpid()) time.sleep(random.random()) return num**2 if __name__ == '__main__': # tp = ThreadPoolExecutor(5) tp = Pool(5) 直接用pool ret_l = [] for i in range(30): ret = tp.submit(func,i) ret_l.append(ret) tp.shutdown() # close + join for ret in ret_l: print(ret.result())

# 创建一个池
# 提交任务 submit
# 阻塞直到任务完成(close + join) shutdown
# 获取结果 result

简便用法 map

 import os
def func(num): print('in %s func'%num,os.getpid()) time.sleep(random.random()) return num**2
if __name__ == '__main__': tp = Pool(5) ret = tp.map(func,range(30)) for i in ret: print(i)

回调函数 add_done_callback

 def func1(num):
     print('in func1 ',num,currentThread())
     return num*'*'

 def func2(ret):
     print('--->',ret.result(),currentThread())
tp = Pool(5) 创建线程池 print('主 : ',currentThread()) for i in range(10): tp.submit(func1,i).add_done_callback(func2) 回调函数收到的参数是需要使用result()获取的 回调函数是由谁执行的? 主线程

线程爬虫的例子

from urllib.request import urlopen
def func(name,url): content = urlopen(url).read() 打开url获取网页内容 content是一个byts类型 return name,content def parserpage(ret): name,content = ret.result() with open(name,'wb') as f: f.write(content) urls = { 'baidu.html':'https://www.baidu.com', 'python.html':'https://www.python.org', 'openstack.html':'https://www.openstack.org', 'github.html':'https://help.github.com/', 'sina.html':'http://www.sina.com.cn/' } tp = Pool(2) for k in urls: tp.submit(func,k,urls[k]).add_done_callback(parserpage)

  

原文地址:https://www.cnblogs.com/baoshuang0205/p/10060990.html