多线程多进程学习threading,queue线程安全队列,线程间数据状态读取。threading.local() threading.RLock()

http://www.cnblogs.com/alex3714/articles/5230609.html

python的多线程是通过上下文切换实现的,只能利用一核CPU,不适合CPU密集操作型任务,适合io操作密集型任务(如web高并发get读取网页文件)

io操作不占用CPU

计算操作占用CPU如 计算1+1

什么是线程(thread)?

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务

A thread is an execution执行 context上下文, which is all the information a CPU needs to execute a stream流 of instructions指令.

一个线程就是一个执行的上下文,CPU执行指令所需的指令流。

Suppose假设 you're reading a book, and you want to take休息一下 a break right now,

假设你在读一本书,这个时候你想休息一下

but you want to be able to come back and resume reading from the exact point where you stopped.

但是你想回来的时候从上次精确的位置重新开始读

One way to achieve that is by jotting 笔记down the page number, line number, and word number.

一种实现的方法是去记下这个页码,行号,和字符位置。

So your execution context for reading a book is these 3 numbers.

所以你执行上下文切换需要记住这三个数字

If you have a roommate, and she's using the same technique 技巧, she can take 拿走the book while you're not using it, and resume reading from where she stopped.

如果您有一个室友,他也想用同样技巧,当你不用的时候,他把这个书拿走,从他上次读到的地方开始读。

Then you can take it back, and resume it from where you were.

你可以再拿回来,从你读到的位置重新开始读。

Threads work in the same way. 线程以同样的方式工作。

A CPU is giving you the illusion that it's doing multiple computations at the same time.

一核CPU给你的错觉(illusion) 是同时在处理多个计算

It does that by spending a bit of time on each computation.It can do that because it has an execution context for each computation.

他通过在每个计算上花费一丁点时间,因为他可以执行切换上下文在每个计算任务之间

Just like you can share a book with your friend, many tasks can share a CPU.

就像你可以共享一本书给你朋友一样,许多个任务可以共享一个CPU。

On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.

从技术层面讲,一个执行上下文就是一个线程,一个CPU寄存器的值的组合

Last: threads are different from processes.最后,线程和进程是不一样的。

A thread is a context of execution, while a process is a bunch of resources associated with a computation.

线程是一个上下文执行的指令,进程就是一簇(bunch)资源的集合,在计算时。

A process can have one or many threads.

一个进程可以有一个或多个线程

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).

澄清一下,资源包括内存页(内存地址)(一个进程中的所有线程时共享内存的),如文件描述符(如套接字),安全证书,进程中的用户ID

什么是进程(process)?

An executing instance of a program is called a process.

每个程序执行的实例称作一个进程

Each process provides the resources needed to execute a program.

每个进程提供一个程序执行所需的资源

A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier,

一个进程有一个虚拟的内存地址,可执行代码,系统接口,安全的上下文,进程号

environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started

环境变量,优先级类,最小和最大的工作空间限制,和最少一个线程执行

with a single thread, often called the primary thread, but can create additional threads from any of its threads.

每个进程启动的时候,会自动启动一个线程,第一个线程就是主线程,但是创建一个额外的线程。

进程与线程的区别?

【进程和线程没有可比性,进程只是资源的集合。】

启动一个线程要快于进程。启动一个进程相当于要建一个房间,启动一个线程相当于拉一个人进屋。

  1. Threads share the address space of the process that created it; processes have their own address space
    .线程个共享内存空间,进程是独立的
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
    线程共享数据的时候是一份数据,进程之间共享数据是进行了拷贝,比如两个子房间,共享数据是完全复制了一份。
  3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
    线程可以直接与进程的其他线程通信;进程必须使用进程间通信与同级进程通信
  4. New threads are easily created; new processes require duplication of the parent process.
    一个新的线程较容易创建,一个新进程需要拷贝一份数据
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
    线程可以对相同进程的线程进行相当大的控制;进程只能对子进程进行控制。

Changes to the main thread (cancellation, priority change, etc.) may affect the behavior行为 of the other threads of the process; changes to the parent process does not affect child processes.

对主线程的更改(取消、优先级更改等)可能会影响进程中其他线程的行为;对父进程的更改不会影响子进程。

Python全局解释器锁GIL:

python中的多线程无法利用多核CPU,GIL机制确保了统一时刻只能有一个线程被调用执行。例如一个双线程代码在跑时,双核CPU每个核只能利用50%;

示例代码:启动两个线程执行while死循环

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2020/7/2 14:30
#  @Author:zhangmingda
#  @File: 双线程死循环.py
#  @Software: PyCharm
#  Description:测试双线程可否充分利用双核CPU

import threading

def test():
    while True:
        pass
t1 = threading.Thread(target=test)
t1.start() #子线程死循环占满CPU

# 主线程循环占满CPU
while True:
    pass

执行htop看负载,每个核只用了50%,所以python的多线程,并不是真正意义的多线程

python的thread模块是比较底层的模块

python的threading模块是对thread做了一些包装的,可以更加方便的被使用

示例代码一:循环创建子线程)threading.Thread()

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2020/7/1 17:29
#  @Author:zhangmingda
#  @File: threading_study.py
#  @Software: PyCharm
#  Description:

import time,threading
'''常用多线程写法:for循环'''
def run(n):
    print('task:',n)
    time.sleep(2)
    print('task :%s done'% n)
t_obj = []
'''每个线程启动后就直接启动下一个线程,不等上一个线程的执行结果,
为了获取每个线程执行的结果(等待线程执行结束),可以用.join(),'''
start_time = time.time()

for i in range(50):
    t = threading.Thread(target=run,args=(i,))
    t.start() #循环启动所有线程
    t_obj.append(t) #将每个线程对象保存下来
for i in t_obj: #设置每个线程结束后主线程再继续
    i.join()
print('cost_time(花费时间):',time.time() - start_time)

输出

task :49 done
task :48 done
task :47 done
cost_time(花费时间): 2.006685256958008

Process finished with exit code 0

-------------------------------------------

示例代码二:循环创建子线程(取消主线程对子线程的等待;主线程代码执行完毕,不影响子线程代码执行)

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2020/7/1 17:29
#  @Author:zhangmingda
#  @File: threading_study.py
#  @Software: PyCharm
#  Description:

import time,threading
'''常用多线程写法:for循环'''
def run(n):
    print('task:',n)
    time.sleep(2)
    print('task :%s done'% n)
t_obj = []
'''每个线程启动后就直接启动下一个线程,不等上一个线程的执行结果,
为了获取每个线程执行的结果(等待线程执行结束),可以用.join(),'''
start_time = time.time()

for i in range(50):
    t = threading.Thread(target=run,args=(i,))
    t.start() #循环启动所有线程
    t_obj.append(t) #将每个线程对象保存下来
# for i in t_obj: #设置每个线程结束后主线程再继续
#     i.join()
print('cost_time(花费时间):',time.time() - start_time)

输出

......

task: 49
cost_time(花费时间): 0.006013393402099609
task :1 done
task :0 done

.........

示例代码三:面向对象的方式写法只要继承threading.Thread就可以了,然后重写run方法

#!/usr/bin/env python
# Author:Zhangmingda
import time,threading

 '''面向对象方式写法'''
 class Mythread(threading.Thread):
     def __init__(self,n):
         super(Mythread,self).__init__()
         self.n = n
     def run(self):
         print('running:',self.n)

 t1 = Mythread('t1')
 t2 = Mythread('t2')
 t1.start()
 t2.start()

#!/usr/bin/env python
# Author:Zhangmingda
'''面向对象方式循环写法'''
import threading,time

class Mythread(threading.Thread):
    def __init__(self,n):
        super(Mythread,self).__init__()
        self.n = n
    def run(self):
        print('running:',self.n)
        time.sleep(1)

t_obj = []
start_time = time.time()
for i in range(50):
    t = Mythread(i)
    t.start()
    t_obj.append(t)
for i in t_obj:
    i.join()
print('running OVER:time:',time.time() - start_time)

面向对象的for循环并发

输出

......

running: 49
running OVER:time: 1.008784294128418

Process finished with exit code 0

示例代码四:子线程随主线程生命周期threading.Thread().setDaemon(True)

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2020/7/1 17:51
#  @Author:zhangmingda
#  @File: thread_daemon.py
#  @Software: PyCharm
#  Description:
#!/usr/bin/env python
# Author:Zhangmingda
import time,threading
'''常用多线程写法:for循环'''
def run(n):
    print('task:',n)
    time.sleep(2) #每个并行的子线程睡眠两秒
    print("task %s end"% n) #来不及执行就被结束
start_time = time.time()

for i in range(50):
    t = threading.Thread(target=run,args=(i,))
    t.setDaemon(True) #设置每个线程为主线程的守护进程(即当主线程结束时,子线程随之陪葬)
    t.start() #循环启动所有线程
time.sleep(1) #主线程睡眠1秒 (设置启动的子线程setDaemon为守护进程时,随主线程结束)
print('cost_time(花费时间):',time.time() - start_time)
'''此脚本的运行时间则为1秒多点自动结束'''

输出

...

task: 49
cost_time(花费时间): 1.0092644691467285

Process finished with exit code 0

查看线程数量len(threading.enumerate());threading.activeCount()当前活跃线程

示例代码

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2020/7/1 19:32
#  @Author:zhangmingda
#  @File: threading_enumerate.py
#  @Software: PyCharm
#  Description:
import threading
from time import sleep,ctime

def sing():
    for i in range(3):
        print("正在唱歌...%d"%i)
        sleep(1)

def dance():
    for i in range(3):
        print("正在跳舞...%d"%i)
        sleep(1)

if __name__ == '__main__':
    print('---开始---:%s'%ctime())

    t1 = threading.Thread(target=sing)
    t2 = threading.Thread(target=dance)

    t1.start()
    t2.start()

    while True:
        length = len(threading.enumerate())
        print('当前运行的线程数为:%d'%length)
        print('当前活跃的线程数为:%d'% threading.activeCount())
        print('当前active_count数为:%d'% threading.active_count())
        if length<=1:
            break
        sleep(0.5)

总结

  1. 每个线程一定会有一个名字,尽管上面的例子中没有指定线程对象的name,但是python会自动为线程指定一个名字。
  2. 当线程的run()方法结束时该线程完成。
  3. 无法控制线程调度程序,但可以通过别的方式来影响线程调度的方式。
  4. 线程的几种状态

 线程互斥锁threading.Lock(blocking=True/False)(全局变量共享问题)

并行的时候,仅仅有全局解释器锁GLI是不行的,比如

#!/usr/bin/env python
# Author:Zhangmingda

import threading,time

num = 0
def run():
    global num
    num +=1

for i in range(1000):
    t = threading.Thread(target=run)
    t.start()
print('num:',num)

Ubuntu上python2 多次执行运行:偶发会出现同一时刻修改同一个变量的情况。比如循环对全局变量+1 循环1000次可能最终结果不足1000

局部变量是否要加锁呢?

局部变量多线程示例代码(无需加锁):

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2020/7/2 8:11
#  @Author:zhangmingda
#  @File: 局部变量无需加锁.py
#  @Software: PyCharm
#  Description:

import threading
import time

class MyThread(threading.Thread):
    def __init__(self,num,sleeptime):
        super(MyThread,self).__init__()
        self.num = num
        self.sleepTime = sleeptime
    def run(self):
        self.num += 1
        time.sleep(self.sleepTime)
        print("线程(%s) , num:%s" %(self.name,self.num))

if __name__ == '__main__':
    mutex = threading.Lock()
    t1 = MyThread(100,2)
    t2 = MyThread(200,1)
    t1.start()
    t2.start()

输出

线程(Thread-2) , num:201
线程(Thread-1) , num:101

Process finished with exit code 0

如下代码加线程锁,是的线程变成串行(失去意义)

其中,锁定方法acquire可以有一个blocking参数。

  • 如果设定blocking为True,则当前线程会堵塞,直到获取到这个锁为止(如果没有指定,那么默认为True)
  • 如果设定blocking为False,则当前线程不会堵塞
#!/usr/bin/env python
# Author:Zhangmingda
import threading,time

num = 0
lock = threading.Lock() #线程锁
t_obj = []
def run():
    lock.acquire() #每次调用这个函数就先加一把锁
    global num
    num +=1
    time.sleep(1)
    lock.release()#释放这个锁

for i in range(10):
    t = threading.Thread(target=run)
    t.start()
    t_obj.append(t)
for t in t_obj:
    t.join() #等待每个并行的线程结束
print('num:',num)

'''加锁使得整体并行效果编程了串行,总花费的时间为10秒'''

线程锁

输出

num: 10

Process finished with exit code 0

 

 线程锁:保证同一时刻只有一个线程在运行。

锁出现嵌套,容易出现锁死现象,如下为使用 threading.Lock()  嵌套锁出现锁死现象示例代码
#!/usr/bin/env python
# Author:Zhangmingda
import threading, time


def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num


def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2


def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)


if __name__ == '__main__':
    num, num2 = 0, 0
    lock = threading.Lock() #嵌套锁出现锁死现象
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()

while threading.active_count() != 1:
    print('总线程数量: ',threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

#嵌套的线程锁,不适用递归出现锁死现象示例代码

递归锁threading.RLock(),支持嵌套;不会锁死

#!/usr/bin/env python
# Author:Zhangmingda
import threading, time


def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num


def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2


def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)


if __name__ == '__main__':
    num, num2 = 0, 0
    lock = threading.RLock() #使用递归锁避免出现嵌套锁出现锁死现象
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()

while threading.active_count() != 1:
    print('总线程数量: ',threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)
递归锁

 解释:在第二道锁里面,不使用递归锁出现的锁死出不来现象。线程总数始终保持在11个,如果使用递归锁就会结束

 

threading.BoundedSemaphore(<线程数>).acquire()限定可以同时跑的线程

#!/usr/bin/env python
# Author:Zhangmingda
import threading,time
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print('run the thread:%s'% n)
    semaphore.release()
    
if __name__ == "__main__":
    semaphore = threading.BoundedSemaphore(5) 
    #限制每一时刻只有5个线程运行,其它的请排队,然后运行中的5个每完成一个,就再进入运行队列一个线程
    for i in range(20):
        t = threading.Thread(target=run,args=(i,))
        t.start()
while threading.active_count() != 1:
    pass
else:
    print('--- all threads done---')

效果:每次只执行5个线程;输出5个结果。其他的等待..

线程间共享事件状态 threading.Event()

示例代码:

两个子线程:红绿灯一个线程;车流一个线程,红灯停绿灯行.....
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2020/7/1 19:10
#  @Author:zhangmingda
#  @File: lighter_car.py
#  @Software: PyCharm
#  Description: threading.Event()这是标志位,多个线程之间检测标志位做对应动作

import threading,time

event = threading.Event()
def lighter():
    count = 0
    event.set() #设置事件标志位,表示开启绿灯
    while True:
        if  5 < count  < 10:
            event.clear() #清除标志位,表示红灯
            print('33[41;1m红灯红灯,请止步33[0m')
        elif count >= 10: #10秒循环一次
            count = 0 #重置变量
        else:
            event.set() #第一次循环后,重新设置标志位,开启绿灯
            print('33[42;1m绿灯绿灯,走吧走吧33[0m')
        count += 1
        time.sleep(1)

def car(name):
    while True:
        if event.is_set(): #判断绿灯标志位是否存在,存在就开车
            print('33[32;1m %s  绿灯,开车中....33[0m'%name)

        else:
            print('33[31;1m %s 等红灯...33[0m'%name)
            event.wait() #等待标志位,不往下走
        time.sleep(1)

light = threading.Thread(target=lighter,)
light.start()

car1 = threading.Thread(target=car,args=('TesiLa',))
car1.start()

效果图

 

线程安全队列 from queue import Queue

Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么就做完),能够在多线程中直接使用。可以使用队列来实现线程间的同步。

用FIFO队列实现上述生产者与消费者问题的代码如下:

生产者:两个生产线程在库存不足1000时同时生产100个产品

消费者:5个消费线程监测库存大于100 每个线程循环消费3个产品。等于每1秒消费15件产品

import threading
import time

#python2中
from Queue import Queue

#python3中
# from queue import Queue

class Producer(threading.Thread):
    def run(self):
        global queue
        count = 0
        while True:
            if queue.qsize() < 1000:
                for i in range(100):
                    count = count +1
                    msg = '生成产品'+str(count)
                    queue.put(msg)
                    print(msg)
            time.sleep(0.5)

class Consumer(threading.Thread):
    def run(self):
        global queue
        while True:
            if queue.qsize() > 100:
                for i in range(3):
                    msg = self.name + '消费了 '+queue.get()
                    print(msg)
            time.sleep(1)


if __name__ == '__main__':
    queue = Queue()

    for i in range(500):
        queue.put('初始产品'+str(i))
    for i in range(2):
        p = Producer()
        p.start()
    for i in range(5):
        c = Consumer()
        c.start()

threading.local()返回ThreadLocal对象。解决子线程同样局部变量,不同变量值问题。

在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。

threading.local()方式全局同名变量,子线程获取各自互不影响的值,示例代码如下。

本例学生学习动作知道自己的名字;函数中调用常用;

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2020/7/2 10:17
#  @Author:zhangmingda
#  @File: threading_ThreadLocal_study.py
#  @Software: PyCharm
#  Description:

import threading

#创建全局变量ThreadLocal对象
local_school = threading.local()
 

def study():
    # 获取当前线程关联的变量
    stu = local_school.student
    print("线程:%s  %s 在学习" % (threading.currentThread().name,stu))
def student(name):
    # 从全局获取本线程对应的学生变量名字
    local_school.student = name
    study()

stu1 = threading.Thread(target=student,args=("zhangmingda",),name="zmd")
stu2 = threading.Thread(target=student,args=("tangdongqi",),name="tdq")

stu1.start()
stu2.start()

stu1.join()
stu2.join()

输出效果:

线程:zmd zhangmingda 在学习
线程:tdq tangdongqi 在学习

Process finished with exit code 0

最low的方式,函数调用传参:

def process_student(name):
    std = Student(name)
    # std是局部变量,但是每个函数都要用它,因此必须传进去:
    do_task_1(std)
    do_task_2(std)

def do_task_1(std):
    do_subtask_1(std)
    do_subtask_2(std)

def do_task_2(std):
    do_subtask_2(std)
    do_subtask_2(std)

稍微好一点的方式,全局字典,每个线程中参数作为全局字典中的一个键值对;用同样逻辑获取到线程自己对应的变量值

global_dict = {}

def std_thread(name):
    std = Student(name)
    # 把std放到全局变量global_dict中:
    global_dict[threading.current_thread()] = std
    do_task_1()
    do_task_2()

def do_task_1():
    # 不传入std,而是根据当前线程查找:
    std = global_dict[threading.current_thread()]
    ...

def do_task_2():
    # 任何函数都可以查找出当前线程的std变量:
    std = global_dict[threading.current_thread()]
    ...
原文地址:https://www.cnblogs.com/zhangmingda/p/9240134.html