python之并发编程

一、操作系统与应用程序

  在没有介绍线程与进程相关的知识之前,需要给大家引入一个知识点,关于操作系统与应用程序又与线程与进程有什么关系?

  硬件
...
  装系统
  系统就是一个有程序员写出来的软件,该软件用于控制计算机硬件,让他们之间相互配合
  装软件(安装应用程序)
  ...
  并发与并行
  并发,假,由于执行速度贴别快,感觉不到停顿
  并行,真,创建10个人同时操作
  线程与进程
  单线程与单进程
  一个软件默认一个进程,可以有多个
  线程:
  工作的最小单元
  共享进程中的所有资源
  每个线程可以分担一些任务,最终完成最后的结果
  进程:
  独立开辟内存
  进程之间的数据隔离
  注:GIL锁,是在进程中起到了阻塞线程的作用
  总结:
  1.操作系统帮助开发者操作硬件
  2.程序员写好代码在操作系统上运行(依赖解释器)
  3.任务特别多
   python多线程情况下:
  计算密集型操作:效率低
  IO操作:效率高
  python多进程情况下:
  计算密集型操作:效率高(浪费资源)
  IO操作:效率高(浪费资源)
  以后写python时:
  IO密集型用多线程
  计算密集型用多进程
  java多线程情况下:
  计算密集型操作:效率高
  IO操作:效率高
  java多进程情况下:
  计算密集型操作:效率高(浪费资源)
  IO操作:效率高(浪费资源)

GIL锁:全局解释器锁,用于限制一个进程中间同一个时刻只有一个线程被cpu调度
扩展:默认GIL锁在执行100个指令

二、线程/进程/协程
  线程是程序中一个单一的顺序控制流程。进程内有一个相对独立的、可调度的执行单元,是系统独立调度和分派CPU的基本单位
 指令运行时的程序的调度单位。在单个程序中同时运行多个线程完成不同的工作,称为多进程。
  
  线程的简单示例:
    
 1 import time
 2     import threading
 3     def task():
 4         time.sleep(0)   #0秒也会有延迟
 5         print("拿快递")
 6     # 创建一个线程
 7     t = threading.Thread(target=task)
 8     # 执行线程
 9     t.start()
10 
11     print("干什么。。。")
12 
13     import time
14     import threading
15 
16     def task(n):
17         print("执行任务",n)
18         time.sleep(5)
19         print("...")
20         print("任务%s 执行完毕" %n)
21     while True:
22         name = input("请输入任务")
23         t = threading.Thread(target=task,args=(name,))
24         t.start()

所以说线程就像是流水线一样,走到哪里该作什么,是早已经规划好的。

  线程的使用:

  

1 import threading,time
2 def num():
3     print("456")
4 t = threading.Thread(target=num)
5 t.start()
6 print("123")
7 # 运行结果
8 456
9 123

  setDaemon方法

  

 1 import threading,time
 2 def sleep(arg):
 3     time.sleep(arg)
 4     print(arg)
 5 
 6 t1 = threading.Thread(target=sleep,args=(3,))
 7 t1.start()
 8 
 9 t2 = threading.Thread(target=sleep,args=(6,))
10 t2.setDaemon(True)
11 t2.start()
12 
13 print("1")
14 # 运行结果
15 # 1
16 # 3
17 # 总结,线程中的主线程默认会等其他线程执行完毕后才关闭
18 # 当设置setDaemon(True)的时候会不等

  join方法

  

 1 import threading,time
 2 
 3 def join(arg):
 4     time.sleep(arg)
 5     print(arg)
 6 print("123")
 7 t1 = threading.Thread(target=join,args=(3,))
 8 t1.start()
 9 t1.join()
10 print("456")
11 #  运行结果:
12 # 123
13 # 3
14 # 456
15 # 总结:当在子线程中添加join方法(可以加参数,表示最多等多久),则会等待子线程运行完毕后执行主线程

  获取线程的名称相关

  

 1 import threading
 2 def name():
 3     t = threading.current_thread() # 获取当前线程的对象
 4     print(t.name)
 5 
 6 t1 = threading.Thread(target=name)
 7 t1.setName("amazing")
 8 t1.start()
 9 
10 t2 = threading.Thread(target=name)
11 t2.setName("Magic")
12 t2.start()
13 # 运行结果
14 # amazing
15 # Magic
16 # 总结:可以在函数中添加threading.current_thread()用来获取当前线程的对象

  线程池

 1 from concurrent.futures import ThreadPoolExecutor
 2 import time
 3 
 4 pool = ThreadPoolExecutor(5)        #创建一个线程池,并设置同时最多有5个线程可以被调度
 5 def func(arg):
 6     time.sleep(1)
 7     print(arg)
 8 for i in range(4):
 9     pool.submit(func,i)      # 去线程池中申请一个线程执行func函数
10 # 有节制的约束线程的创建,保证线程的性能。

   进程是计算机是系统进行资源分配和调度的基本单位,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。

   进程的编写

 1 import multiprocessing
 2 
 3 def task(arg):
 4     print(arg)
 5 def run():
 6     for i in range(10):
 7         p = multiprocessing.Process(target=task,args=(i,))
 8         p.start()
 9 if __name__ == '__main__':    # 在windows系统中 需要将主进程放在main方法下运行
10     run()

  进程的使用

  join

 1 import multiprocessing,time
 2 def task(arg):
 3     time.sleep(2)
 4     print(arg)
 5 def run():
 6     for i in range(10):
 7         p = multiprocessing.Process(target=task,args=(i,))
 8         p.start()
 9         p.join(1)
10         print("等待结束")
11 if __name__ == '__main__':
12     run()
13 # 总结:join等待进程完成过后才执行

  daemon

 1 import multiprocessing,time
 2 def task(arg):
 3     time.sleep(2)
 4     print(arg)
 5 def run():
 6     print("123")
 7     p = multiprocessing.Process(target=task,args=(789,))
 8     p.daemon = True
 9     p.start()
10 
11     print("456")
12 if __name__ == '__main__':
13     run()
14 总结:当daemon=True表示不等待

  name

 1 import multiprocessing,time
 2 def task(arg):
 3     p = multiprocessing.current_process()
 4     print(p.pid)  #p.ident
 5     time.sleep(2)
 6     print(arg)
 7 def run():
 8     for i in range(10):
 9         p = multiprocessing.Process(target=task,args=(i,))
10         p.name = "asd"
11         p.start()
12         p.join(1)
13         print("等待结束")
14 if __name__ == '__main__':
15     run()

  进程间的相关问题

    进程间的数据是否相互共享?

 1 import multiprocessing
 2 data_list =[]
 3 def task(arg):
 4     data_list.append(arg)
 5     print(data_list)
 6 def run():
 7     for i in range(10):
 8         p = multiprocessing.Process(target=task,args=(i,))
 9         p.start()
10 if __name__ == '__main__':
11     run()
12 进程中的数据是不共享的

    怎么做才能够数据间的相互共享

      Queue()

    

 1 import multiprocessing,queue,time
 2 
 3 q = multiprocessing.Queue()
 4 def task(arg):
 5     q.put(arg)
 6 
 7 def run():
 8     for i in range(10):
 9         p = multiprocessing.Process(target=task,args=(i,))
10         p.start()
11     time.sleep(2)
12     v1 = q.get()
13     print(v1)
14 if __name__ == '__main__':
15     run()

      Manager()

 1 import multiprocessing,queue,time
 2 m = multiprocessing.Manager()
 3 dic = m.dict()
 4 
 5 def task(arg):
 6     dic[arg] = 100
 7 
 8 def run():
 9     for i in range(10):
10         p = multiprocessing.Process(target=task,args=(i,))
11         p.start()
12         p.join()
13     input(">>>")
14     print(dic.valus)
15 if __name__ == '__main__':
16     run()

  进程池

 1 from concurrent.futures import ProcessPoolExecutor
 2 import time
 3 
 4 def task(arg):
 5     time.sleep(2)
 6     print(arg)
 7 
 8 if __name__ == '__main__':
 9     pool  = ProcessPoolExecutor(5)
10     for i in range(10):
11         pool.submit(task,i)

   协程不是真实存在的东西,是程序员创造出来的,协程指的是微线程。

  协程的编写

 1 import greenlet
 2 def func1():
 3     print("123")
 4     gr2.switch()
 5     print("987")
 6     gr2.switch()
 7 def func2():
 8     print("465")
 9     gr1.switch()
10     print("789")
11 # 协程1
12 gr1 = greenlet.greenlet(func1)
13 # 协程2
14 gr2 = greenlet.greenlet(func2)
15 
16 # 执行协程1
17 gr1.switch()

  

 1 基于yield实现协程
 2 def f1():
 3     print(11)
 4     yield
 5     print(22)
 6     yield
 7     print(33)
 8 
 9 def f2():
10     print(44)
11     yield
12     print(55)
13     yield
14     print(66)
15 v1 = f1()
16 v2 = f2()
17 next(v1)

  协程的作用

  对一个线程进程分片,使得线程在代码块之间进行来回切换执行,而不是按原来的流程执行。
  ...

三、锁相关

  "锁"顾名思义,就是用来约束某种对象。

  锁的简单示例

 1 import threading,time
 2 lock = threading.RLock()  # 创建锁都对象
 3 n = 10
 4 def lk(i):
 5     print("代码不加锁",i)
 6     lock.acquire()  #加锁
 7     global n
 8     print("当前线程",i,"读取到的n值为:",n)
 9     n = i
10     time.sleep(1)
11     print("当前线程",i,"读取到的n值为:",n)
12     lock.release()  #释放锁头
13 for i in range(10):
14     t = threading.Thread(target=lk,args=(i,))
15     t.start()
16 # 总结:加完锁之后此区域的代码同一时刻只能有一个线程执行
线程安全与线程不安全的问题
当多线程操作时,内不会让所有线程排队处理这时候线程是安全的,例如:list/dict/Queue
线程不安全 + 人(锁) => 排队处理
锁的用法:
  lock 普通锁 最简单的锁
 1 import threading,time
 2 v = []
 3 lock = threading.Lock()    # 创建一个lock对象
 4 def func(arg):
 5     lock.acquire()      # 添加锁
 6     v.append(arg)
 7     time.sleep(1)
 8     m = v[-1]
 9     print(arg,m)
10     lock.release()          # 释放锁
11 for i in range(10):
12     t = threading.Thread(target=func,args=(i,))
13     t.start()
14 # 总结:在普通锁中当多个lock存在时会死锁
  Rlock 递归锁 也是最常用的锁
import threading,time
lock = threading.RLock()
v = []
def func(arg):
    lock.acquire()
    lock.acquire()      # 添加多个锁
    v.append(arg)
    time.sleep(1)
    m = v[-1]
    print(arg,m)
    lock.release()
    lock.release()
for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

# 总结:支持递归的锁多次解多次,允许多个lock同时存

  BoundedSemapore 信号锁

 1 import threading,time
 2 lock = threading.BoundedSemaphore(5)     # 表示通过设定可以放行n个线程
 3 
 4 def func(arg):
 5     lock.acquire()
 6     print(arg)
 7     time.sleep(1)
 8     lock.release()
 9 for i in range(20):
10     t = threading.Thread(target=func,args=(i,))
11     t.start()
12 
13 # 总结:可以一次放行多个线程。

  Condition 条件锁

 1 import threading,time
 2 lock = threading.Condition()
 3 def func(arg):
 4     lock.acquire()
 5     lock.wait()     # 表示等待,线程到此处停止需要一个接收值通知放行多少个线程
 6     print(arg)
 7     time.sleep(1)
 8     lock.release()
 9 for i in range(20):
10     t = threading.Thread(target=func,args=(i,))
11     t.start()
12 
13 while True:
14     user_input = int(input("请输入要放行的个数"))
15     lock.acquire()
16     lock.notify(user_input)
17     lock.release()
18 总结: 在使用该方法时需要在主线程或其他线程中,写通知wait的方法,告诉wait放行多少个线程

  Event 事件锁

 1 import time,threading
 2 lock = threading.Event()
 3 def func(arg):
 4     lock.wait()    # 表示当线程到此处停住,相当于红灯
 5     print(arg)
 6 for i in range(10):
 7     t = threading.Thread(target=func,args=(i,))
 8     t.start()
 9 user_input = input("输入红灯或绿灯")
10 if user_input =="绿灯":
11     lock.set()    # 表示一次性释放所有线程 ,相当于绿灯
12 else:
13     pass
14 
15 # 总结: 一次性放行所有

   进程锁的使用方法大体与线程数相同,只是在导入模块时是import multiprocessing,调用的时候也是。


为甚么要用锁?
1.线程安全,列表和字典线程安全:
非线程安全
控制一段代码
  2.如果多个线程同时修改一个值的时候需要加锁
线程锁/进程锁/GIL锁
  线程锁是为了在线程进行数据操作时保证数据的安全性(涉及到线程安全问题)
  进程锁是为了在数据共享时起到防护的作用
  GIL锁是为了限制同一个进程中只能有一个线程进入python解释器

threading.local()  *
 1 import threading,time
 2 v = threading.local()
 3 
 4 def func(arg):
 5     v.space = arg   # 在函数内部会创建一个属于自己的空间
 6     time.sleep(1)
 7     print(v.space,arg)
 8 
 9 for i in range(10):
10     t = threading.Thread(target=func,args=(i,))
11     t.start()
12 
13 # 总结:为不同的线程创建一个空间(字典),可以去当前自己空间里面存取值

生产者消费者模型
生产者
  队列:先进先出
  栈:先进后出
消费者
更便利

  简单示例:
 1 import queue,threading,time
 2 
 3 q = queue.Queue()
 4 
 5 def producer(id):
 6     """
 7     表示生产者
 8     :param id:
 9     :return:
10     """
11     while True:
12         time.sleep(1)
13         q.put("发送信息")
14         print("%s号用户发送信息"%id)
15 
16 for i in range(1,5):
17     t =threading.Thread(target=producer,args=(i,))
18     t.start()
19 
20 def consumer(id):
21     """
22     表示消费者
23     :param id:
24     :return:
25     """
26     while True:
27         time.sleep(1)
28         q.get("获得信息")
29         print("%s号客服获得信息"%id)
30 
31 for i in range(1,5):
32     t =threading.Thread(target=consumer,args=(i,))
33     t.start()
34 
35 # 总结:生产者消费者模型解决了不用一直等待的问题
 





。。。

  
原文地址:https://www.cnblogs.com/qq631243523/p/9618708.html