day33

一、线程锁

  线程安全,多线程操作时,内部会让所有线程排队处理。如:list/dict/Queue
  线程不安全 + 人 => 排队处理。

1. RLock/Lock(一次放一个)

  示例: 线程安全

  import threading

  v = []
  def func(arg):
      v.append(arg)  # 线程安全
      print(v)

  for i in range(10):
      t = threading.Thread(target=func, args=(i,))
      t.start()

  示例: 线程安全+不安全

    import threading
    import time
    
    v = []
    def func(arg):
        v.append(arg)  # 这里依然线程安全
        # 数据被别人修改,这里线程不安全
        time.sleep(0.01)  # 如果这里的逻辑很长,时间变长,所有人都添加进去,那么取的时候会错乱
        m = v[-1]
        print(arg, m)
    
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()
    
    # 结果
    # 1 9
    # 0 9
    # 2 9
    # 4 9
    # 3 9
    # 5 9
    # 6 9
    # 7 9
    # 9 9
    # 8 9

  a. Lock(): 一次放一个(只能锁一次,放一次)

    import threading
    import time
    
    v = []
    lock = threading.Lock()
    
    def func(arg):
        lock.acquire()
        v.append(arg)
        time.sleep(0.01)
        m = v[-1]
        print(arg, m)
        lock.release()
    
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()

  b. RLock(): 一次放一个(可以锁多次,放多次) -- 常用

    import threading
    import time
    
    v = []
    lock = threading.RLock()
    
    def func(arg):
        lock.acquire()
        lock.acquire()
    
        v.append(arg)
        time.sleep(0.01)
        m = v[-1]
        print(arg, m)
    
        lock.release()
        lock.release()
    
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()

  注意:RLock和Lock用法一样,只是Lock只能锁一次解一次,RLock支持锁多次 解多次,以后用RLock。

2. BoundedSemaphore(n): 一次放N个, 信号量

    import threading
    import time
    
    lock = threading.BoundedSemaphore(3)  # 一次放3个
    def func(arg):
        lock.acquire()
        print(arg)
        time.sleep(1)
        lock.release()
    
    for i in range(20):
        t = threading.Thread(target=func, args=(i,))
        t.start()

3. condition(),一次放x个,x是动态的.

  方式一: 由用户指定一次放行的数量

    import threading
    import time
    
    lock = threading.Condition()

    def func(arg):
        print('线程进来了')
        lock.acquire()
        lock.wait()  # 加锁
    
        print(arg)
        time.sleep(1)
    
        lock.release()
    
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()
    
    while True:
        inp = int(input(">>>"))
    
        lock.acquire()
        lock.notify(inp)  # 通知放行的个数
        lock.release()

  方式二: 满足某种条件或状态才放行.

    import threading
    import time
    
    lock = threading.Condition()

    def xxxx():
        print("来执行函数了")
        input(">>>")
        return True  # 返回值
    
    def func(arg):
        print('线程进来了')
        lock.wait_for(xxxx)  # 收到的值为真,则放行一个
        print(arg)
        time.sleep(1)
    
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()

4. Event: 一次放所有

    import time
    import threading
    
    lock = threading.Event()
    
    def func(arg):
        print('线程进来了')
        lock.wait()  # 加锁,变红灯
        print(arg)
    
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()
    
    input('>>>')
    lock.set()  # 绿灯
    input('>>>')
    
    lock.clear()  # 再次变红灯
    
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()
    
    lock.set()  # 绿灯
    input('>>>')

总结:
  线程安全, 列表和字典线程安全;
  为什么要加锁?
    - 非线程安全
    - 控制一段代码

5. threading.local

  作用:内部自动为每个线程维护一个空间(字典),用于当前存取属于自己的值。保证线程之间的数据隔离。
    {
      线程ID: {...}
      线程ID: {...}
      线程ID: {...}
      线程ID: {...}
    }

    import time
    import threading
    
    INFO = {}
    
    class Local(object):
    
        def __getattr__(self, item):
            ident = threading.get_ident()
            return INFO[ident][item]
    
        def __setattr__(self, key, value):
            ident = threading.get_ident()
            if ident in INFO:
                INFO[ident][key] = value
            else:
                INFO[ident] = {key:value}
    
    obj = Local()
    
    def func(arg):
        print(threading.get_ident())
        obj.phone = arg  # obj.phone = arg 调用对象的 __setattr__方法(“phone”,1)
        time.sleep(2)
        print(obj.phone, arg)  # obj.phone 调用对象的__getattr__方法,得到返回值
    
    for i in range(10):
        t = threading.Thread(target=func,args=(i,))
        t.start()
threading.local 的原理
    import threading
    import time
    
    v = threading.local()
    
    def func(arg):
        # 内部护卫当前线程创建一个空间用于存储, phone=自己的值
        v.phone = arg
        time.sleep(2)
        print(v.phone, arg)  # 去当前线程自己空间取值
    
    for i in range(10):
        t = threading.Thread(target=func,args=(i,))
        t.start()
threading.local 的使用

三、线程池

  作用:
    1、限定线程的个数,不会导致由于线程过多导致系统运行缓慢或崩溃
    2、线程池不需要每次都去创建或销毁,通过重复利用已创建的线程降低线程创建和销毁造成的消耗。节约了资源、
    3、当任务到达时,任务可以不需要等到线程创建就能立即执行,响应时间更快。

  以后写代码不要一个一个创建线程,而是创建一个线程池, 每当有一个新任务时,去线程池中申请线程去执行任务.

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    def task(a1, a2):
        time.sleep(1)
        print(a1, a2)
    
    # 创建了一个线程池(最多5个线程)
    pool = ThreadPoolExecutor(5)
    
    for i in range(40):
        # 去线程池中申请一个线程,让线程执行task函数
        pool.submit(task, i, 8)

四、生产者消费者模型

  三部件: 生产者, 消费者, 队列

  队列, 先进先出(管道传球)
  栈, 后进先出(子弹夹)

  问题:  生产者消费者模型解决了什么问题?
  答:  不用一直等待的问题

    import queue
    
    q = queue.Queue()
    
    q.put(1)  # 先放 1
    q.put(2)
    q.put(3)
    
    v1 = q.get()  # 先出 1
    print(v1)
栈的简单使用
    import time
    import queue
    import threading
    
    q = queue.Queue()  # 线程安全
    
    def producer(id):
        """
        生产者
        :return:
        """
        while True:
            time.sleep(2)
            q.put('包子')
            print('厨师%s 生产了一个包子' %id )
    
    for i in range(1,4):
        t = threading.Thread(target=producer,args=(i,))
        t.start()
    
    
    def consumer(id):
        """
        消费者
        :return:
        """
        while True:
            time.sleep(1)
            v1 = q.get()
            print('顾客 %s 吃了一个包子' % id)
    
    for i in range(1,3):
        t = threading.Thread(target=consumer,args=(i,))
        t.start()
生产者消费者模型 - 生产包子/吃包子

五、面向对象补充

  

原文地址:https://www.cnblogs.com/kangqi452/p/11797270.html