多线程

模块:threading

  Thread

开启方式,join()方法、互斥锁、守护线程与多进程相同

进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。

与多进程的区别:

  1、开进程的开销远大于开线程

  2、同一进程内的多个线程共享该进程的地址空间

# mutex
# 互斥锁:牺牲效率,保证数据安全
from threading import Thread, Lock
import time

n = 100


def task():  # 多进程互斥锁必须传参?
    global n
    mutex.acquire()
    temp = n
    time.sleep(0.01)
    n = temp - 1
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    t_l = []
    for i in range(100):
        t = Thread(target=task)
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()
    # time.sleep(0.1)
    print('', n)
View Code

GIL全局解释器锁

GIL是cpython解释器的特性,不是python的特性
GIL保证同一时间只能有一个线程被执行
当多线程的python代码执行时,所有线程都去抢GIL锁,谁抢到执行谁
  当该线程执行完或者该线程内遇到IO操作时,释放GIL,其他线程去抢GIL
  如果这多个线程操作同一个内存数据,就会导致错误
  所以要保证数据安全,就要在多线程任务中自定义一把互斥锁,
  即使第一次抢到GIL的线程遇到IO时,由于互斥锁未释放,其他线程也不会被执行
# 计算密集型:用多进程
# from multiprocessing import Process
# from threading import Thread
# import os,time
# def work():
#     res=0
#     for i in range(100000000):
#         res*=i
#
#
# if __name__ == '__main__':
#     l=[]
#     # print(os.cpu_count()) #本机为8核
#     start=time.time()
#     for i in range(8):
#         # p=Process(target=work) #耗时8s多
#         p=Thread(target=work) #耗时37s多
#         l.append(p)
#         p.start()
#     for p in l:
#         p.join()
#     stop=time.time()
#     print('run time is %s' %(stop-start))


# IO密集型:用多线程
from multiprocessing import Process
from threading import Thread
import threading
import os,time

def work():
    time.sleep(2)

if __name__ == '__main__':
    l=[]
    # print(os.cpu_count()) #本机为4核
    start=time.time()
    for i in range(400):
        # p=Process(target=work) #耗时2.697多,大部分时间耗费在创建进程上
        p=Thread(target=work) #耗时2.02多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))
View Code

死锁与递归锁:

互斥锁只能acquire一次,释放之后才能被再次获取
递归锁:可以连续acquire多次,每acquire一次计数器+1,只有计数为0时,才能被抢到acquire
from threading import Thread,Lock
import time

mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s 拿到了A锁' %self.name)

        mutexB.acquire()
        print('%s 拿到了B锁' %self.name)
        mutexB.release()

        mutexA.release()


    def f2(self):
        mutexB.acquire()
        print('%s 拿到了B锁' % self.name)
        time.sleep(0.1)

        mutexA.acquire()
        print('%s 拿到了A锁' % self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
死锁
from threading import Thread, RLock
import time

mutexB = mutexA = RLock()


class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s 拿到了A锁' % self.name)

        mutexB.acquire()
        print('%s 拿到了B锁' % self.name)
        mutexB.release()

        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print('%s 拿到了B锁' % self.name)
        time.sleep(7)

        mutexA.acquire()
        print('%s 拿到了A锁' % self.name)
        mutexA.release()

        mutexB.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()
递归锁

信号量

也是锁,互斥锁可比喻为一件房子只有一把锁,一次只有一个人能进入
信号量可比喻为一间房安装了多把锁,抢到任何一把锁都能进入
from threading import Thread, Semaphore, currentThread
import time, random

sm = Semaphore(3)


def task():
    # sm.acquire()
    # print('%s in' %currentThread().getName())
    # sm.release()
    with sm:
        print('%s in' % currentThread().getName())
        time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=task)
        t.start()
View Code

Event事件

  Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

from threading import Event

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。
from threading import Thread, Event
import time

event = Event()
# event.wait()
# event.set()


def student(name):
    print('学生%s 正在听课' % name)
    event.wait(3)  # 超时时间,到时间后自动往下执行
    print('学生%s 课间活动' % name)


def teacher(name):
    print('老师%s 正在授课' % name)
    time.sleep(5)
    print('下课')
    event.set()


if __name__ == '__main__':
    stu1 = Thread(target=student, args=('alex',))
    stu2 = Thread(target=student, args=('wxx',))
    stu3 = Thread(target=student, args=('yxx',))
    t1 = Thread(target=teacher, args=('egon',))

    stu1.start()
    stu2.start()
    stu3.start()
    t1.start()
View Code

定时器:指定n秒后执行某操作

from threading import Timer


def task(name):
    print('hello %s' % name)


t = Timer(5, task, args=('egon',))
t.start()
View Code
from threading import Timer
import random


class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self, interval=5):
        self.cache = self.make_code()
        print(self.cache)
        self.t = Timer(interval, self.make_cache)
        self.t.start()

    def make_code(self, n=4):
        res = ''
        for i in range(n):
            s1 = str(random.randint(0, 9))
            s2 = chr(random.randint(65, 90))
            res += random.choice([s1, s2])
        return res

    def check(self):
        while True:
            code = input('请输入你的验证码>>: ').strip()
            if code.upper() == self.cache:
                print('验证码输入正确')
                self.t.cancel()
                break


obj = Code()
obj.check()
View Code

队列

import queue

q = queue.Queue(3)  # 先进先出->队列

q.put('first')
q.put(2)
q.put('third')
print(3)
q.put(4)
print(4)
# q.put(4,block=False) #q.put_nowait(4)
# q.put(4,block=True,timeout=3)
#
#
# #
# print(q.get())
# print(q.get())
# print(q.get())
# # print(q.get(block=False)) #q.get_nowait()
# # print(q.get_nowait())
#
# # print(q.get(block=True,timeout=3))


# q=queue.LifoQueue(3) #后进先出->堆栈
# q.put('first')
# q.put(2)
# q.put('third')
#
# print(q.get())
# print(q.get())
# print(q.get())

#
# q=queue.PriorityQueue(3) #优先级队列
#
# q.put((10,'one'))
# q.put((40,'two'))
# q.put((30,'three'))
#
# print(q.get())
# print(q.get())
# print(q.get())
View Code

进程池、线程池

基本方法
1、submit(fn, *args, **kwargs)
异步提交任务

2、map(func, *iterables, timeout=None, chunksize=1) 
取代for循环submit的操作

3、shutdown(wait=True) 
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

4、result(timeout=None)
取得结果

5、add_done_callback(fn)
回调函数
# from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# import os,time,random
#
# def task(name):
#     print('name:%s pid:%s run' %(name,os.getpid()))
#     time.sleep(random.randint(1,3))
#
#
# if __name__ == '__main__':
#     # pool=ProcessPoolExecutor(4)  # 不指定则默认使用cpu核数
#     pool=ThreadPoolExecutor(5)
#
#     for i in range(10):
#         pool.submit(task,'egon%s' %i)
#
#     pool.shutdown(wait=True)  # 相当于join()
#
#
#     print('主')


from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import currentThread
import os, time, random


def task():
    print('name:%s pid:%s run' % (currentThread().getName(), os.getpid()))
    time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(5)

    for i in range(10):
        pool.submit(task, )

    pool.shutdown(wait=True)

    print('')
View Code

提交任务的两种方式

1、同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行

  同步 != 阻塞

from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print('%s is laing' %name)
    time.sleep(random.randint(3,5))
    res=random.randint(7,13)*'#'
    return {'name':name,'res':res}

def weigh(shit):
    name=shit['name']
    size=len(shit['res'])
    print('%s 拉了 《%s》kg' %(name,size))


if __name__ == '__main__':
    pool=ThreadPoolExecutor(13)

    shit1=pool.submit(la,'alex').result()
    weigh(shit1)

    shit2=pool.submit(la,'wupeiqi').result()
    weigh(shit2)

    shit3=pool.submit(la,'yuanhao').result()
    weigh(shit3)
View Code

2、异步调用:提交完任务后,不地等待任务执行完毕,异步通常伴随着回调

  举个例子,某部门经理X手下原本只有一个员工A,X每天早上给A安排了任务(先干这再干那再再干那)后就回去喝茶了,后来老板又给X安排了两个员工B和C,以后X就要给三个人安排任务了,A去干这个,B去干那个,C再去干什么什么,X怎么知道他们三个有没有完成任务呢?他们主动给经理报告呗。

from concurrent.futures import ThreadPoolExecutor
import time
import random


def la(name):
    print('%s is laing' % name)
    time.sleep(random.randint(3, 5))
    res = random.randint(7, 13) * '#'
    return {'name': name, 'res': res}


def weigh(shit):
    shit = shit.result()
    name = shit['name']
    size = len(shit['res'])
    print('%s 拉了 《%s》kg' % (name, size))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)

    pool.submit(la, 'alex').add_done_callback(weigh)

    pool.submit(la, 'wupeiqi').add_done_callback(weigh)

    pool.submit(la, 'yuanhao').add_done_callback(weigh)
View Code
原文地址:https://www.cnblogs.com/webc/p/9168480.html