线程,进程 ,队列 基本用法总结

一、线程(线程是最小的工作单位,同一进程内的线程共享资源)

创建线程:threading模块

创建一个线程:threading.Thread(target=函数名,args=(参数,) )  *这里的args后面必须是元祖,而且当括号内是一个参数时,第一个参数后加逗号

我们利用threading模块创建的线程都是子线程,因为解释器在工作的时候会创建一个主线程来进行工作,我们自己创造的线程都是子线程

import threading
import  time
def f1(a1,a2):
    time.sleep(2)
    print("456")
t = threading.Thread(target = f1,args = (124,111,))
t.setDaemon(True)#不等待子线程执行完成
t.start()
t1 = threading.Thread(target = f1,args = (124,111,))
t1.setDaemon(True)#不等待子线程执行完成
t1.start()
t2 = threading.Thread(target = f1,args = (124,111,))
t2.setDaemon(True)#不等待子线程执行完成
t2.start()


注:主线程是代码从上往下执行(我们看不见),setDaemon默认为False,即主线程等待子线程执行完,一起退出,如果我们设置t2.setDaemon(True),即主线程不等子线程执行完就退出

t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

t.join(参数) :中的参数表示最多的等待时间,如果函数的执行时间小于等待时间,那么就等待你函数的执行时长,如果等待时间大于函数执行时间,那么等待你参数时间后就不等待了

t.setDaemon(True)#是决定是否等待子线程执行完成后再退出,加上表示不等待子线程执行完就退出,不加表示等待子线程后再退出

import threading
import  time
def f1(a1,a2):
    time.sleep(3)
    print("456")
t = threading.Thread(target = f1,args = (124,111,))
t.setDaemon(True)#不等待子线程执行完成
t.start()
t.join(2)
print("222")

注:主线程执行时间是3,而我等待时间是2,所以在我等待的时间内子线程没有执行,所以只执行主线程就退出

 

import threading
import  time
def f1(a1,a2):
    time.sleep(3)
    print("456")
t = threading.Thread(target = f1,args = (124,111,))
t.start()
t.join(2)
print("222")


注:主线程执行时间是3,而我等待时间是2,所以在我等待的时间内子线程没有执行,所以我先执行主线程的再执行你子线程的

二、 threading.Event

Event是线程间通信最间的机制之一:一个线程发送一个event信号,其他的线程则等待这个信号。用于主线程控制其他线程的执行。 Events 管理一个flag,这个flag可以使用set()设置成True或者使用clear()重置为False,wait()则用于阻塞,在flag为True之前。flag默认为False。

  • Event.wait([timeout]) : 堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)。
  • Event.set() :将标识位设为Ture
  • Event.clear() : 将标识伴设为False。
  • Event.isSet() :判断标识位是否为Ture。
import threading
import time
def do(event):
    time.sleep(1)
    print('start')
    event.wait()
    print('execute')

event_obj = threading.Event()
for i in range(10):
    t = threading.Thread(target=do, args=(event_obj,))
    t.start()

event_obj.clear()       #设置为False
inp = input('input:')
if inp == 'true':
    event_obj.set()     #设置为Ture

三、线程锁

由于同一进程下的多个线程共享同一内存地址的资源,所以就有可能出现不同的线程修改同一数据,导致脏数据(不正确的数据)的出现,所以我们应该在一个线程修改的时候加一把锁,等这个线程修改完后下一线程再来修改,所以就引进了线程锁的概念

import threading
import time
globals_num = 0
lock = threading.RLock()
def Func():
    lock.acquire()  # 获得锁
    global globals_num
    globals_num += 1
    time.sleep(1)
    print(globals_num)
    lock.release()  # 释放锁
for i in range(5):
    t = threading.Thread(target=Func)
    t.start()

注:每一个线程执行的时候都加一把锁,就不会出现脏数据,只有前面一个线程执行完,解锁以后后面一个线程才能获得变量并对其进行修改,所以输出顺序是每一个输出都等待1s,而且输出顺序递增

4、队列

特性:先进先出,单项队列只能一进一出

创建队列 :queue.Queue(参数)参数表示队列的容量即最多容纳的个数

q = queue.Queue(maxsize=0)  # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。

q.join()    # 等到队列为空的时候,在执行别的操作
q.qsize()   # 返回队列的大小 (不可靠)
q.empty()   # 当队列为空的时候,返回True 否则返回False (不可靠)
q.full()    # 当队列满的时候,返回True,否则返回False (不可靠)
q.put(item, block=True, timeout=None) #  表示往队列里面放
q.get(block=True, timeout=None) # 表示从队列里面取值,如果队列里面没值是会一直等即阻塞 
q.put_nowait(item) #   等效于 put(item,block=False)
q.get_nowait() #    等效于 get(item,block=False) 表示如果队列里面没值时不等

生产者--消费者 模型

import queue
import threading
message = queue.Queue(10)
def producer(i):  #每一个生成的线程都一直往队列里面放对应生成的值
    while True:
        message.put(i)
def consumer(i):  ##每一个生成的线程都一直从队列里面获取生成的值
    while True:
        msg = message.get()
        print(msg)
for i in range(12):
    t = threading.Thread(target=producer, args=(i,)) #生成12个线程共同执行producer函数
    t.start()
for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))  # #生成10个线程共同执行consumer函数
    t.start()
import queue
import threading
message = queue.Queue(10)
def producer(i):   
        message.put(i)  #每一个线程把各自生成的数放到队列中,一共放了5个
def consumer(i):
        msg = message.get() #一共有10个线程过来获取,只获取了5个就没有了,剩下的就一直等待
        print(msg)
for i in range(5):
    t = threading.Thread(target=producer, args=(i,))
    t.start()
for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
import queue
import threading
message = queue.Queue(10)
def producer(i):
        message.put(i)  #每一个线程把各自生成的数放到队列中,一共放了5个
def consumer(i):
        msg = message.get_nowait()  #一共有10个线程过来获取,只获取了5个就没有了,不等待直接退出
        print(msg)
for i in range(5):
    t = threading.Thread(target=producer, args=(i,))
    t.start()
for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()

 五、进程

创建进程:multiprocessing模块

创建一个进程:multiprocessing.Process(target=函数名,args=(参数,) ) *这里的args后面必须是元祖,而且当括号内是一个参数是,第一个参数后加逗号

在windos下不支持fork()的创建,如果想用进程是必须把对进程的操作写在 if  __name__=="__main__"下,在linux下不用

我们利用multiprocessing模块创建的线程都是子进程,因为解释器在工作的时候会创建一个主线程(包含在主进程中)来进行工作,我们自己创造的进程都是子进程

import  time
import multiprocessing
def f1(a1,a2):
    time.sleep(2)
    print("456")
if __name__ == '__main__':
    t = multiprocessing.Process(target = f1,args = (124,111,))
    t.daemon = True#不等待子进程执行完成
    t.start()
    t1 = multiprocessing.Process(target = f1,args = (124,111,))
    t1.daemon = True#不等待子进程执行完成
    t1.start()
    print("end")


end

注:主线程是代码从上往下执行(我们看不见),daemon默认为False,即主叫进程等待子进程执行完,一起退出,如果我们设置t1.daemon = True,即主进程不等待子进程执行完就退出
import  time
import multiprocessing
def f1(a1,a2):
    time.sleep(2)
    print("456")
if __name__ == '__main__':
    t = multiprocessing.Process(target = f1,args = (124,111,))
    t.daemon = True#不等待子进程执行完成
    t.start()
    print("111")
    t.join(1)
    print("end")

111
end

注:先执行print("111") 在等待1函数没执行就不等待了,直接执行print("end")

 六、

由于多个进程有自己的内存资源,所以修改只是修改了各自的东西,所以不存在线程锁的概念

import multiprocessing
li = []
def f(a):
    li.append(a)  #不同的进程往不同的进程里面的li中添加元素
    print(li)
if __name__ =="__main__":
    for a in range(5):
        t =  multiprocessing.Process(target=f,args=(a,))
        t.start()

[0]
[2]
[1]
[3]
[4]

进程间的数据共享 

如果我们想让俩个不同的进程共同修改同一内容的话,即如果你真有需要 要共享数据, multiprocessing提供了两种方式。

1、Array

from multiprocessing import Process,Array

temp = Array('i', [11,22,33,44])
def Foo(i):
    temp[i] = 100+i
    for item in temp:
        print(i,'----->',item)
for i in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()

2、Manager

import time
from  multiprocessing import Process,Manager
def f(i,dic):
    dic[i] = 100 + i
    print(dic)
if __name__  == "__main__":
    manage = Manager()
    dic = manage.dict()
    for i in range(5):
        p = Process(target= f,args=(i,dic,))
        p.start()
        p.join()  #必须要加,要不就找不到文件了

{0: 100}
{0: 100, 1: 101}
{0: 100, 1: 101, 2: 102}
{0: 100, 1: 101, 2: 102, 3: 103}
{0: 100, 1: 101, 2: 102, 3: 103, 4: 104}

 进程池

 由于线程和进程并不是越多越好,而是应该创建最优的数量,所以就有了进程池的概念,创建一个进程池并不会立即创建进程,而是根据需要从进程池里面取的时候才会创建进程

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply 
  • apply_async
from  multiprocessing import Process,Pool
import time
def Foo(i):
    time.sleep(2)
    return i+100
def Bar(arg):
    print(arg)
if __name__ == "__main__":
    pool = Pool(5)
    #print pool.apply(Foo,(1,))
    #print pool.apply_async(func =Foo, args=(1,)).get()
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,),callback=Bar)
    print('end')
    pool.close()
    pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。


注:申请一个进程执行Foo方法,Foo方法没执行完,Bar方法永远不触发,只有Foo方法执行完了,才触发Bar方法,并将其返回值赋值给Bar方法的参数

  回调函数:用于验证主函数是否执行完

apply :1、从进程池中申请进程

     2、每次申请的一个线程总都自带一个join

     3、特点是申请一个执行一个,完了再申请再执行

     4、因为每一个申请的进程都有一个join,所以任务执行过程是排队进行的

     5、申请的次数和进程池中进程的数量没有关系,因为我每次申请一个,等执行完第一个再执行下一个

from multiprocessing import Pool
import time
def f1(a):
    time.sleep(1)
    print(a)
if __name__ == "__main__":
    pool = Pool(5)
    for i in range(6):
        T = pool.apply(func=f1,args=(i,))
        print("222222222222")

0
222222222222

1
222222222222

2
222222222222

3
222222222222

4
222222222222

5
222222222222

apply_async:1、从进程池中申请进程

        2、每次申请的一个线程都没有join,所以申请以后不等待执行就申请下一个

        3、如果申请的数量大于进程池中进程的数量,则每次申请的进程数就是进程池中的进程数,等待执行完毕,进程回到进程池后才可以从新申请

        4、每一个任务都是并发执行的,而且可以设置回调函数

        5、进程池的执行默认不等待子进程的执行,所以,如果主进程执行完毕后子进程没执行完就直接退出了,所以我们应该在主进程执行后加一个join,这样主进程                                    就会等待子进程执行完毕后一起退出(自己瞎猜的O(∩_∩)O哈哈~)

from multiprocessing import Pool
import time
def f1(a):
    time.sleep(1)
    print(a)
if __name__ == "__main__":
    pool = Pool(5)
    for i in range(9):
        pool.apply_async(func=f1,args=(i,))
        print("222222222222")
    pool.close() 
    pool.join() #进程池的方法

222222222222
222222222222
222222222222
222222222222
222222222222
222222222222
222222222222
222222222222
222222222222


0
1
2
3
4


5
6
7
8
原文地址:https://www.cnblogs.com/luxiaojun/p/5601889.html