Python多线程操作。(threading)

老样子,先上参考连接:

https://www.cnblogs.com/jiangfan95/p/11439543.html

https://www.liaoxuefeng.com/wiki/1016959663602400/1017629247922688

https://blog.csdn.net/mr__l1u/article/details/81772073

先复制一篇进程与线程的特点。

1> 进程、线程和协程的认识:

  进程是系统进行资源分配和调度的独立单位;

  线程是进程的实体,是CPU调度和分派的基本单位;

  协程也是线程,称微线程,自带CPU上下文,是比线程更小的执行单元;

2> 区别

  一个程序至少有一个进程,一个进程至少有一个线程;

  线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高;

  进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率;

  线程不能够独立执行,必须依存在进程中;

3> 优缺点:线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反

4> 协程:我们假设把一个进程比作我们实际生活中的一个兰州拉面馆,那么负责保持拉面馆运行的服务员就是线程了,每个餐桌代表要完成的任务。

  当我们用多线程完成任务时,模式是这样的==》每来一桌的客人,就在那张桌子上安排一个服务员,即有多少桌客人就得对应多少个服务员;

  而当我们用协程来完成任务时,模式却有所不同了==》 就安排一个服务员,来吃饭得有一个点餐和等菜的过程,当A在点菜,就去B服务,B叫了菜在等待,我就去C,当C也在等菜并且A点菜点完了,赶紧到A来服务… …依次类推。

  从上面的例子可以看出,想要使用协程,那么我们的任务必须有等待。当我们要完成的任务有耗时任务,属于IO密集型任务时,我们使用协程来执行任务会节省很多的资源(一个服务员和多个服务员的区别。##能一个人服务一个店铺,这样的超人给我来一打 (-..-)), 并且可以极大的利用到系统的资源。

上面的介绍,我觉的还是比较不错的,第三个链接可以点进去看原文。

上代码,两种不同的多线程开启模式:

import threading
import time
import logging
import random



class MyThreading(threading.Thread):  # 通过继承的方式
    def __init__(self, num):
        super(MyThreading, self).__init__()
        self.num = num

    def run(self) -> None:
        time.sleep(random.random())
        logging.debug(str(self.__class__.__name__) + '======>' + 'Work: %s' % self.num)


def worker(num):
    """thread worker function"""
    time.sleep(random.random())
    # 通过threading.current_thread().getName()获取线程的名字
    logging.debug(threading.current_thread().getName() + ':' + 'Worker: %s' % num)


#通过logging来打印显示具体信息
logging.basicConfig(level=logging.DEBUG, format="[%(levelname)s] (%(threadName)-10s) %(message)s")

threads = []

for i in range(5):
    my = MyThreading(i)
    my.start()
    t = threading.Thread(target=worker, args=(i,), name='sidian')   # 通过直接调用使用多线程
    t.start()
    threads.append(t)
    threads.append(my)

for i in threads:            # 加了阻塞让新建的多线程执行。
    i.join()


logging.info('__main__:' + threading.current_thread().getName())
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t1.py
[DEBUG] (Thread-5  ) MyThreading======>Work: 4
[DEBUG] (sidian    ) sidian:Worker: 0
[DEBUG] (Thread-1  ) MyThreading======>Work: 0
[DEBUG] (Thread-4  ) MyThreading======>Work: 3
[DEBUG] (sidian    ) sidian:Worker: 4
[DEBUG] (sidian    ) sidian:Worker: 2
[DEBUG] (sidian    ) sidian:Worker: 1
[DEBUG] (Thread-2  ) MyThreading======>Work: 1
[DEBUG] (sidian    ) sidian:Worker: 3
[DEBUG] (Thread-3  ) MyThreading======>Work: 2
[INFO] (MainThread) __main__:MainThread

Process finished with exit code 0

 从代码看,多线程的写法与多进程有很多相同之处。

后面我写个案例看多线程与多进程的一些区别。

import multiprocessing
import threading
import time

n = 0
def run():
    global n
    for i in range(10000):
        n += i
    return n


def s_time(func):
    def wrap():
        t1 = time.perf_counter()
        res = func()
        print(f'({func.__name__})cost_time:{time.perf_counter()-t1:0.5f}')
        return res
    return wrap

@s_time
def run_threads():
    threads = []
    for i in range(10000):
        t = threading.Thread(target=run)
        t.start()
        threads.append(t)
    for i in threads:
        i.join()

@s_time
def run_process():
    threads = []
    for i in range(10000):
        t = multiprocessing.Process(target=run)
        t.start()
        threads.append(t)
    for i in threads:
        i.join()

run_threads()
run_process()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t2.py
(run_threads)cost_time:8.54433
(run_process)cost_time:20.15610

Process finished with exit code 0

 上面是同样开一万个进程与一万的线程的事件对比查,明显多线程快多了。

后面我是我分别开1000个与100个的效果。

/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t2.py
(run_threads)cost_time:0.86798
(run_process)cost_time:1.20561

Process finished with exit code 0
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t2.py
(run_threads)cost_time:0.10313
(run_process)cost_time:0.12918

Process finished with exit code 0

 总体来说,针对IO任务,多线程的开启速度明显比多进程快。

下面讲一下守护线程,跟守护进程一样,守护线程不阻塞主线程的运行,但随着主线程的运行结束而结束。默认情况下线程不作为守护线程。

import threading
import time
import logging


def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

logging.basicConfig(
    level=logging.DEBUG,
    format="(%(threadName)-10s) %(message)s",
)

d = threading.Thread(target=daemon, name='daemon', daemon=True)   # 开启守护线程
# d.setDaemon(True)                  # 第二种方式
t = threading.Thread(target=non_daemon, name='non_daemon')
d.start()
t.start()
d.join(0.1)         # 设置守护线程的阻塞时间,如果不设置时间,讲长期阻塞
logging.info('d.isAlive' + str(d.is_alive()))  # 判断该线程是否还活着
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t3.py
(daemon    ) Starting
(non_daemon) Starting
(non_daemon) Exiting
(MainThread) d.isAliveTrue

Process finished with exit code 0

 下面再介绍一个感觉不是很实用的枚举所有线程的方法。

import random
import threading
import time
import logging


def worker():
    """thread worker function"""
    pause = random.randint(1, 5) / 10
    logging.debug('sleeping %0.2f', pause)
    time.sleep(pause)
    logging.debug('ending')

logging.basicConfig(
    level=logging.DEBUG,
    format="[%(levelname)s] (%(threadName)-10s) %(message)s",
)

for i in range(3):
    t = threading.Thread(target=worker)
    t.setDaemon(True)
    t.start()

main_thread = threading.main_thread()
logging.info(main_thread)                # 主线程
logging.info(threading.enumerate())
for t in threading.enumerate():       # 枚举所有的线程
    if t is main_thread:           # 如果是主线程就跳过
        continue
    logging.debug(f'joining {t.getName()}')    # 非主线程就阻塞
    t.join()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t4.py
[DEBUG] (Thread-1  ) sleeping 0.20
[DEBUG] (Thread-2  ) sleeping 0.50
[DEBUG] (Thread-3  ) sleeping 0.30
[INFO] (MainThread) <_MainThread(MainThread, started 4757224896)>
[INFO] (MainThread) [<_MainThread(MainThread, started 4757224896)>, <Thread(Thread-1, started daemon 123145473859584)>, <Thread(Thread-2, started daemon 123145479114752)>, <Thread(Thread-3, started daemon 123145484369920)>]
[DEBUG] (MainThread) joining Thread-1
[DEBUG] (Thread-1  ) ending
[DEBUG] (MainThread) joining Thread-2
[DEBUG] (Thread-3  ) ending
[DEBUG] (Thread-2  ) ending
[DEBUG] (MainThread) joining Thread-3

Process finished with exit code 0

其实我感觉为什么这么麻烦,其实直接做个空列表,把需要阻塞的线程放进去再循环不是一样吗?

比如下面的:

import random
import threading
import time
import logging


def worker():
    """thread worker function"""
    pause = random.randint(1, 5) / 10
    logging.debug('sleeping %0.2f', pause)
    time.sleep(pause)
    logging.debug('ending')

logging.basicConfig(
    level=logging.DEBUG,
    format="[%(levelname)s] (%(threadName)-10s) %(message)s",
)

threads = []
for i in range(3):
    t = threading.Thread(target=worker)
    t.setDaemon(True)
    t.start()
    threads.append(t)

main_thread = threading.main_thread()
logging.info(main_thread)                # 主线程
logging.info(threading.enumerate())

# for t in threading.enumerate():       # 枚举所有的线程
#     if t is main_thread:           # 如果是主线程就跳过
#         continue
#     logging.debug(f'joining {t.getName()}')    # 非主线程就阻塞
#     t.join()

for t in threads:
    t.join()

 这样不是更加简单吗?

下面上一个更加无聊的定时器线程,实在想不通,这种定时器线程一般用在什么地方。

import threading
import time
import logging

def delayed():
    logging.debug('worker running')

logging.basicConfig(
    level=logging.DEBUG,
    format="[%(levelname)s] (%(threadName)-10s) %(message)s",
)

t1 = threading.Timer(0.3, delayed)   # 0.3秒后启动线程
t1.setName('t1')
t2 = threading.Timer(0.3, delayed)
t2.setName('t2')

logging.debug('starting timers')
t1.start()
t2.start()

logging.debug(f'waiting before canceling {t2.getName()}')
time.sleep(0.2)
logging.debug(f'canceling {t2.getName()}')
t2.cancel()             # 取消线程
logging.debug('done')
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threading_timer.py
[DEBUG] (MainThread) starting timers
[DEBUG] (MainThread) waiting before canceling t2
[DEBUG] (MainThread) canceling t2
[DEBUG] (MainThread) done
[DEBUG] (t1        ) worker running

Process finished with exit code 0

 从代码可以看出,Timer我只看到了延迟启动线程任务与中途可以取消任务的功能,但实在很难想象使用的场景。

下面介绍一个多线程里面的Event线程通讯管理,其实多线程共享数据,我觉设置全局变量作为通讯管理应该也是不错的选择。

import logging
import threading
import time


def wait_for_event(e):
    """wait for the event to be set before doing anything"""
    logging.debug('wait for event starting')   # 线程启动最开始启动
    event_is_set = e.wait()      # 这个还是比较有意思的,竟然还有返回值。
    logging.debug(f'event set: {event_is_set}')
    logging.debug(str(e.is_set()))

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.is_set():        # 默认事件为关闭
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)      # 线程开始进来等待阻塞,可以设置阻塞时间,返回值为时间是否为设置的值
        logging.debug('event set : %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')

logging.basicConfig(
    level=logging.DEBUG,
    format="[%(levelname)s] (%(threadName)-10s) %(message)s",
)

e = threading.Event()

t1 = threading.Thread(
    name='block',
    target=wait_for_event,
    args=(e,),
)
t1.start()
t2 = threading.Thread(
    name='nonblock',
    target=wait_for_event_timeout,
    args=(e, 2),
)
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(3)
e.set()
logging.debug('Event is set')
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/thread_event.py
[DEBUG] (block     ) wait for event starting
[DEBUG] (nonblock  ) wait_for_event_timeout starting
[DEBUG] (MainThread) Waiting before calling Event.set()
[DEBUG] (nonblock  ) event set : False
[DEBUG] (nonblock  ) doing other work
[DEBUG] (nonblock  ) wait_for_event_timeout starting
[DEBUG] (MainThread) Event is set
[DEBUG] (block     ) event set: True
[DEBUG] (block     ) True
[DEBUG] (nonblock  ) event set : True
[DEBUG] (nonblock  ) processing event

Process finished with exit code 0

 从代码可以看出e.wait()还是非常好用的一个方法。

接下来讲一下,多线程操作中的锁。这一点很重要,上面讲的是同步线程操作,这里是要能控制对共享资源的访问,从而避免破坏或丢失数据。

Python的内置数据结构(列表、字典等)是线程安全的,这是Python使用原子字节码来管理这些数据结构的一个副作用(全局解释器锁的一个好处)

Python中实现的其他数据结构或更简单的类型(如整数和浮点数)则没有这个保护。要保证同时安全地访问一个对象,可以使用Lock对象。

锁尽量用with,这样避免锁忘记关闭,防止死锁。

import logging
import threading, time

logging.basicConfig(level=logging.INFO)

# 10 -> 100cups
cups = []
lock = threading.Lock()


def worker(lock = threading.Lock, task=100):
while True:
count = len(cups) # 当多个最后count为99的时候,如果有多个线程进入的话,cups数据个数将出现问题。
if count >= task:
break
logging.info(count)
cups.append(1)
logging.info("{} make 1........ ".format(threading.current_thread().getName()))
logging.info("{} ending=======>".format(len(cups)))


for x in range(10):
threading.Thread(target=worker, args=(lock, 100)).start()
INFO:root:104 ending=======>
INFO:root:104 ending=======>
INFO:root:Thread-6 make 1........ 
INFO:root:105 ending=======>
INFO:root:105 ending=======>
INFO:root:105 ending=======>
import logging
import threading, time

logging.basicConfig(level=logging.INFO)

# 10 -> 100cups
cups = []
lock = threading.Lock()


def worker(lock = None, task=100):
    with lock:           # 加锁以后
        while True:
            count = len(cups)     # 当多个最后count为99的时候,如果有多个线程进入的话,cups数据个数将出现问题。
            if count >= task:
                break
            logging.info(count)
            cups.append(1)
            logging.info("{} make 1........ ".format(threading.current_thread().getName()))
        logging.info("{} ending=======>".format(len(cups)))


for x in range(10):
    threading.Thread(target=worker, args=(lock, 100)).start()
INFO:root:Thread-1 make 1........ 
INFO:root:96
INFO:root:Thread-1 make 1........ 
INFO:root:97
INFO:root:Thread-1 make 1........ 
INFO:root:98
INFO:root:Thread-1 make 1........ 
INFO:root:99
INFO:root:Thread-1 make 1........ 
INFO:root:100 ending=======>
INFO:root:100 ending=======>
INFO:root:100 ending=======>
INFO:root:100 ending=======>
INFO:root:100 ending=======>
INFO:root:100 ending=======>
INFO:root:100 ending=======>
INFO:root:100 ending=======>
INFO:root:100 ending=======>
INFO:root:100 ending=======>

 加锁以后数据就正确了,但加锁真的是一件非常让人蛋疼的事情,这样的话,基本多线程的效果等于0,因为加锁了以后会让并行的线程变成串行,效率会低很多。

Lock的方法:
acquire(blocking=True,timeout=-1)  加锁。默认True阻塞,阻塞可以设置超时时间。非阻塞时成功获取锁返回True,否则返回False。

这个还是非常有意思的,当你再acquire里面填参数0的时候,这个锁大家都可以拿,但假如返回是Flase说明你拿的是假锁。

l = threading.Lock()

r1 = l.acquire(0)
r2 = l.acquire(0)
l.release()
r3 = l.acquire(0)
r4 = l.acquire(0)
logging.info(f'r1>{r1};r2>{r2};r3>{r3};r4>{r4}')
INFO:root:r1>True;r2>False;r3>True;r4>False

 这个还是蛮有意思的,可以通过返回值来判断,谁拿到的锁是真的锁,而且大家可以重复去拿锁,当那把锁解锁以后,第一个拿到的锁的,就是True

插播一条廖大的threading.local代码:

import threading

# 创建全局ThreadLocal对象(在实际使用中,其实你创建了几个线程后期就有几个实际的对象):
local_school = threading.local()


def process_student():
    # 获取当前线程关联的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().getName()))


def process_thread(name):
    # 绑定ThreadLocal的student:
    local_school.student = name
    process_student()


t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

 这个threading.loacl,其实在任何一条线程里面都有一个对立的对象,说实话,要是我用,还不如函数里面层层传递比较,又直接,又清爽。

而且我感觉这个对象创建还会比较消耗内存。

后面用的比较少模块我觉的,我讲抄袭Python3标准库的代码,做好相关解释。

还有由于多线程是共享内存的,如果为了防止用锁带来的效率底下,可以使用queue模块。

queue队列 :使用import queue,用法与进程Queue一样

class queue.Queue(maxsize=0) #先进先出

复制代码
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''
复制代码

class queue.LifoQueue(maxsize=0) #last in fisrt out 

复制代码
import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''
复制代码

class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

复制代码
import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

ok

 同样你可以用queue写一个生产者与消费者的模型。以下我先插入写一个生产者,消费者模型吧。

import threading
import queue
import logging
import time

logging.basicConfig(
    level=logging.DEBUG,
    format='[%(levelname)s] (%(threadName)s) => %(message)s'
)

def consumer(q):
    while True:
        res = q.get()
        if res is None:
            logging.debug(f'你没东西给我吃了,我走了')
            break
        logging.debug(f'我开始吃{res}了')


def product(q):
    for i in ('猪头','milk', 'egg'):
        logging.debug(f'我放入了{i}')
        q.put(i)
        time.sleep(2)
    q.put(None)     # 给一个结束信号

if __name__ == '__main__':
    q = queue.Queue()
    c = threading.Thread(target=consumer, args=(q,))
    p = threading.Thread(target=product, args=(q,))
    c.start()
    p.start()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threading_queue.py
[DEBUG] (Thread-2) => 我放入了猪头
[DEBUG] (Thread-1) => 我开始吃猪头了
[DEBUG] (Thread-2) => 我放入了milk
[DEBUG] (Thread-1) => 我开始吃milk了
[DEBUG] (Thread-2) => 我放入了egg
[DEBUG] (Thread-1) => 我开始吃egg了
[DEBUG] (Thread-1) => 你没东西给我吃了,我走了

Process finished with exit code 0

 接下来,我开始抄写,注释代码了。

首先是同步线程除了使用Event还能使用Condition对象来同步线程。

import logging
import threading
import time

def consumer(cond):
    """wait for the condition and use the resource"""
    logging.debug('Starting consumer thread')
    with cond:           # 必须再condition的环境下才等待
        cond.wait()      # 开始等待
        logging.debug('Resource is available to consumer')


def producer(cond):
    """set up the resource to be used by the consumer"""
    logging.debug('Starting producer thread')
    with cond:       # 必须在condition环境下菜能设置condition
        logging.debug('Making resource available')
        cond.notifyAll()    # 给条件通知

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)

condition = threading.Condition()
# print(dir(condition))
c1 = threading.Thread(name='c1', target=consumer,
                      args=(condition,))
c2 = threading.Thread(name='c2', target=consumer,
                      args=(condition,))
p = threading.Thread(name='p', target=producer,
                     args=(condition,))
c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threding_condition.py
2019-12-12 02:16:24,131 (c1) Starting consumer thread
2019-12-12 02:16:24,334 (c2) Starting consumer thread
2019-12-12 02:16:24,535 (p ) Starting producer thread
2019-12-12 02:16:24,536 (p ) Making resource available
2019-12-12 02:16:24,536 (c1) Resource is available to consumer
2019-12-12 02:16:24,536 (c2) Resource is available to consumer

Process finished with exit code 0

屏障(barrier)是另外一种线程同步机制。Barrier会建立一个控制点,所有参与的线程会在这里阻塞,直到所有这些参与线程到达这个点。

当线程的数量等于你的设置量时候,线程开始工作。感觉非常有意思的玩意。

import threading
import time


def work(barrier):
    print(threading.current_thread().getName(),
          'waiting for barrier with {} others'.format(barrier.n_waiting))
    worker_id = barrier.wait()      # 设置线程数量阻塞,返回值会阻塞数量,跟列表有点像从从0开始,到2返回值,就说明三个线程阻塞完成。
    # print('worker_id_num====>',worker_id)
    print(threading.current_thread().getName(), 'after barrier', worker_id)

NUM_THREADS = 3

barrier = threading.Barrier(NUM_THREADS)           # 外部定义barrier传入运作函数里面。

threads = [
    threading.Thread(
        name=f'worker-{i}',
        target=work,
        args=(barrier,),
    )
    for i in range(6)           # 定一6个线程,如果定义的线程数量如阻塞的数量为非整除,整个线程将阻塞。
]

for t in threads:
    print(t.getName(),'starting')
    t.start()
    time.sleep(0.1)

for t in threads:
    t.join()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threading_barrier.py
worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-2 after barrier 2
worker-0 after barrier 0
worker-1 after barrier 1
worker-3 starting
worker-3 waiting for barrier with 0 others
worker-4 starting
worker-4 waiting for barrier with 1 others
worker-5 starting
worker-5 waiting for barrier with 2 others
worker-5 after barrier 2
worker-4 after barrier 1
worker-3 after barrier 0

Process finished with exit code 0

 后面将介绍一个在使用abort()来避免由于线程数量问题导致的阻塞。

import threading
import time


def work(barrier):
    print(threading.current_thread().getName(),
          'waiting for barrier with {} others'.format(barrier.n_waiting))
    try:
        worker_id = barrier.wait()      # 设置线程数量阻塞,返回值会阻塞数量,跟列表有点像从从0开始,到2返回值,就说明三个线程阻塞完成。
    # print('worker_id_num====>',worker_id)
    except threading.BrokenBarrierError:
        print(threading.current_thread().getName(), 'aborting')
    else:
        print(threading.current_thread().getName(), 'after barrier', worker_id)

NUM_THREADS = 3

barrier = threading.Barrier(NUM_THREADS)           # 外部定义barrier传入运作函数里面。

threads = [
    threading.Thread(
        name=f'worker-{i}',
        target=work,
        args=(barrier,),
    )
    for i in range(4)           # 定一6个线程,如果定义的线程数量如阻塞的数量为非整除,整个线程将阻塞。
]

for t in threads:
    print(t.getName(),'starting')
    t.start()
    time.sleep(0.1)

barrier.abort()             # 定义abort取消符后,就非常方便,当最后的线程无法满足条件,将自动报错,可以通过except接收。

for t in threads:
    t.join()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threading_barrier_abort.py
worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-2 after barrier 2
worker-0 after barrier 0
worker-1 after barrier 1
worker-3 starting
worker-3 waiting for barrier with 0 others
worker-3 aborting

Process finished with exit code 0

 还有最后两个,再说一个限制资源的并发访问。有时候可能需要允许多个工作线程同时访问一个资源,但要限制总数。列如,连接池支持同时链接,但数目可能固定,或者一个网络应用可能支持固定数目的并发下载。这些可以用Semaphore来管理。

import logging
import random
import threading
import time

class ActivePool:

    def __init__(self):
        self.active = []
        self.lock = threading.Lock()

    def makeActive(self,name):    # 不加锁的情况下
        # with self.lock:
        self.active.append(name)
        time.sleep(1)
        logging.debug(f'A{threading.current_thread().getName()}-Running: {self.active}')

    def makeInactive(self,name):
        # with self.lock:
        self.active.remove(name)
        # time.sleep(0.1)
        logging.debug(f'I{threading.current_thread().getName()}-Running: {self.active}')

def worker(s, pool):
    logging.debug('Waiting to join the pool')
    with s:                                          # 可以使用acquire与release
        # time.sleep(0.1)
        name = threading.current_thread().getName()
        print(f'{name} is coming.')
        pool.makeActive(name)
        # time.sleep(0.2)
        pool.makeInactive(name)
        # time.sleep(0.1)
        print(f'{name} is out.')

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)

pool = ActivePool()
s = threading.Semaphore(2)       # 设置最大连接数量

def spend_time(func):
    def warp():
        t1 = time.perf_counter()
        func()
        t2 = time.perf_counter()
        print(f'speng_time is :{t2 -t1:0.5f}')
    return warp

@spend_time
def run():
    threads = []
    for i in range(4):
        t = threading.Thread(
            target=worker,
            name=str(i),
            args=(s, pool),
        )
        t.start()
        threads.append(t)
    for t in threads:
        t.join()


run()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/therading_semaphonre.py
0 is coming.
1 is coming.
2019-12-13 03:22:01,429 (0 ) Waiting to join the pool
2019-12-13 03:22:01,429 (1 ) Waiting to join the pool
2019-12-13 03:22:01,430 (2 ) Waiting to join the pool
2019-12-13 03:22:01,430 (3 ) Waiting to join the pool
0 is out.
2 is coming.1 is out.

3 is coming.
2019-12-13 03:22:02,429 (0 ) A0-Running: ['0', '1']
2019-12-13 03:22:02,429 (0 ) I0-Running: ['1']
2019-12-13 03:22:02,430 (1 ) A1-Running: ['1']
2019-12-13 03:22:02,430 (1 ) I1-Running: []
2019-12-13 03:22:03,430 (3 ) A3-Running: ['2', '3']
2019-12-13 03:22:03,431 (3 ) I3-Running: ['2']
2019-12-13 03:22:03,431 (2 ) A2-Running: ['2']
2019-12-13 03:22:03,431 (2 ) I2-Running: []
3 is out.
2 is out.
speng_time is :2.00197

Process finished with exit code 0

 加锁的情况下:

import logging
import random
import threading
import time

class ActivePool:

    def __init__(self):
        self.active = []
        self.lock = threading.Lock()

    def makeActive(self,name):    # 不加锁的情况下
        with self.lock:
            self.active.append(name)
            time.sleep(1)
            logging.debug(f'A{threading.current_thread().getName()}-Running: {self.active}')

    def makeInactive(self,name):
        with self.lock:
            self.active.remove(name)
            # time.sleep(0.1)
            logging.debug(f'I{threading.current_thread().getName()}-Running: {self.active}')

def worker(s, pool):
    logging.debug('Waiting to join the pool')
    with s:                                          # 可以使用acquire与release
        # time.sleep(0.1)
        name = threading.current_thread().getName()
        print(f'{name} is coming.')
        pool.makeActive(name)
        # time.sleep(0.2)
        pool.makeInactive(name)
        # time.sleep(0.1)
        print(f'{name} is out.')

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)

pool = ActivePool()
s = threading.Semaphore(2)       # 设置最大连接数量

def spend_time(func):
    def warp():
        t1 = time.perf_counter()
        func()
        t2 = time.perf_counter()
        print(f'speng_time is :{t2 -t1:0.5f}')
    return warp

@spend_time
def run():
    threads = []
    for i in range(4):
        t = threading.Thread(
            target=worker,
            name=str(i),
            args=(s, pool),
        )
        t.start()
        threads.append(t)
    for t in threads:
        t.join()


run()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/therading_semaphonre.py
2019-12-13 03:24:15,506 (0 ) Waiting to join the pool
2019-12-13 03:24:15,506 (1 ) Waiting to join the pool
2019-12-13 03:24:15,506 (2 ) Waiting to join the pool
2019-12-13 03:24:15,507 (3 ) Waiting to join the pool
0 is coming.1 is coming.

2019-12-13 03:24:16,507 (1 ) A1-Running: ['1']
2019-12-13 03:24:16,507 (1 ) I1-Running: []
1 is out.
2 is coming.
2019-12-13 03:24:17,509 (0 ) A0-Running: ['0']
2019-12-13 03:24:17,510 (0 ) I0-Running: []
0 is out.
3 is coming.
2019-12-13 03:24:18,512 (2 ) A2-Running: ['2']
2019-12-13 03:24:18,512 (2 ) I2-Running: []
2 is out.
2019-12-13 03:24:19,514 (3 ) A3-Running: ['3']
2019-12-13 03:24:19,515 (3 ) I3-Running: []
3 is out.
speng_time is :4.00896

Process finished with exit code 0

 其实根据我实际的操作来看,是否需要加锁应该也看实际需要,像Python保护的类型,操作列表,字典就无需加锁,要不然效率非常低。

测试多线程运行的过程中,当跑的线程中,外部传入的函数的参数,是属于每个函数内部的一部分,并不会相互干扰,属于局部变量。

最后对于前面的local类的使用,参照Python3标准库的介绍,再详细记录一下。(主要用在函数内部调用函数,每个人操作一份具体的属性值,可以参考操作个人的存款)

有些资源需要锁定以便于多个线程使用,另外一些资源则需要保护,让他们能够对非这些线程的"所有者"隐藏。

local()类会创建一个对象,,它能隐藏值,使其再不同的线程中无法被看到。(一半这种用在多层函数使用中,一层需要使用的函数就可以设置好对象属性,后面所有的函数对可以直接调用这个对象属性)

对比廖大的解释,感觉还是这个专业书籍翻译过来的解释更加让人理解。

import random
import threading
import logging

def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug(f'value={val}')

def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)        # 对对象的属性进行赋值
    show_value(data)

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)
local_data = threading.local()
show_value(local_data)            # 主线程开始先跑起来
local_data.value = 1000
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))   # 开启两个线程,并传入local对象。
    t.start()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/thread_local_book.py
(MainThread) No value yet
(MainThread) value=1000
(Thread-1  ) No value yet
(Thread-1  ) value=90
(Thread-2  ) No value yet
(Thread-2  ) value=62

Process finished with exit code 0

上面的代码测试了,不同的线程执行可以拥有各自线程的对象属性。但如果希望这个对象一出来就自带一个默认的线程属性,

可以继承local,并再__init__初始化过程中给予对象设置属性。(银行用户操作,其实用起来蛮好了,可以混合继承该类,用于多线程操作,操作起来会更加方便)

import random
import threading
import logging

def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug(f'value={val}')

def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)        # 对对象的属性进行赋值
    show_value(data)

class MyLocal(threading.local):    # 通过继承的方式

    def __init__(self,value):
        super(MyLocal, self).__init__()
        logging.debug('Initializing %r', self)
        self.value = value    # 给予对象初始值

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

local_data = MyLocal(1000)
show_value(local_data)
for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))   # 开启两个线程,并传入local对象。
    t.start()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/thread_loacl_defaults.py
(MainThread) Initializing <__main__.MyLocal object at 0x1060acc20>
(MainThread) value=1000
(Thread-1  ) Initializing <__main__.MyLocal object at 0x1060acc20>
(Thread-1  ) value=1000
(Thread-1  ) value=53
(Thread-2  ) Initializing <__main__.MyLocal object at 0x1060acc20>
(Thread-2  ) value=1000
(Thread-2  ) value=99

Process finished with exit code 0

最后给自己提个醒,无论是Event事件,还是condition,又或者是semaphore,local等等,杜需要将该对象放入函数中,再外面对该对象设置好相关参数,再函数中执行该对象具体方法。

 

初步的就记录到这里,总体感觉多线程与多进程很多操作都是非常相似的,平时一半并发操作,应该还是多线程比较好,至少内存使用少,启动快。

而且对比方法里面线程也比较多,也不存在数据通信问题。

原文地址:https://www.cnblogs.com/sidianok/p/12020060.html