十六.多线程

0.Python多线程,不适合cpu密集操作型的任务,如大量的计算。适合io操作密集型的任务。大量的高密度运算,多线程不一定提高效率,多线程适合轻量级多个任务。

1._thread的使用

#不建议使用_thread模块,因为主线程退出后,其他线程没有清理直接退出。而使用threading模块,会确保在所有重要的子线程退出前,保持整个进程存活。

import _thread
from time import sleep, ctime
def loop0():
    print('start loop 0 at:', ctime())
    sleep(4)
    print('loop 0 done at:', ctime())
def loop1():
    print('start loop 1 at:', ctime())
    sleep(2)
    print('loop 1 done at:', ctime())
def main():
    print('starting at:', ctime())
    _thread.start_new_thread(loop0,())
    _thread.start_new_thread(loop1,())
    sleep(6) #没有这句话就直接结束了,相当于多线程等待
    print('all DONE at:', ctime())
if __name__ == '__main__':
    main()

#下面不需要特别指定等待时间,本质是通过锁对象的状态监测来执行等待
import _thread as thread
from time import sleep, ctime
SleepLoops = [4, 2] #休息时间
def Func(nloop, nsec, lock):
    print('start loop', nloop, 'at:', ctime())
    sleep(nsec)
    print('loop', nloop, 'done at:', ctime())
    lock.release() #针对锁对象使用,改变锁的状态,释放锁
def main():
    print('starting threads...')
    #锁住线程的对象
    LocksHandle = []
    #对象数量
    nSleep = range(len(SleepLoops))
    #建立锁对象
    for i in nSleep:
        lock = thread.allocate_lock() #得到锁对象
        lock.acquire() #取得锁对象
        LocksHandle.append(lock)
    #执行功能函数
    for i in nSleep:
        #传递3个参数到新线程的功能函数中
        thread.start_new_thread(Func, (i, SleepLoops[i], LocksHandle[i]) )
    #本质通过状态监测来处理多线程等待问题
    for i in nSleep:
        #如果锁对象是锁住状态,一直等待
        while LocksHandle[i].locked():
            pass
    print('all DONE at:', ctime())
if __name__ == '__main__':
    main()
View Code

2.threading的使用

#threading模块不需要繁琐的设置就可以轻松关闭线程。

#可调用的函数实例----------------------------------------------------
import threading
import time
SleepLoops = [2, 1] #休息时间
#定义功能函数
def loop(nloop, nsec):
    print('start loop', nloop, 'at:', time.ctime())
    time.sleep(nsec)
    print('loop', nloop, 'done at:', time.ctime())
#主要的多线程函数
def main():
    print('starting at:', time.ctime())
    threads = [] #储存线程句柄
    nSleep = range(len(SleepLoops))
    #传递函数到线程,并且添加线程句柄,这里不执行功能函数。
    for i in nSleep:
        t = threading.Thread(target=loop,args=(i, SleepLoops[i]))
        threads.append(t)
    #开始线程,cup会执行功能函数
    print("开始执行功能函数")
    for i in nSleep:
        threads[i].start()
    #join()函数让线程在执行完后自动结束线程,不需要去设置繁琐的状态监控。
    for i in nSleep:            # wait for all
        threads[i].join()       # threads to finish
    print('all DONE at:', time.ctime())

if __name__ == '__main__':
    main()

#可调用的类实例-----------------------------------------------------
import threading
from time import sleep, ctime
SleepLoops = [2, 1]
class ThreadFunc(object):
    def __init__(self, func, args, name=''):
        self.name = name
        self.func = func
        self.args = args
    def __call__(self):
        self.func(*self.args)
def loop(nloop, nsec):
    print('start loop', nloop, 'at:', ctime())
    sleep(nsec)
    print('loop', nloop, 'done at:', ctime())

def main():
    print('starting at:', ctime())
    threads = [] #存档线程句柄
    n = range(len(SleepLoops))
    for i in n:        # create all threads
        #本质是调用类中__call__
        t = threading.Thread(target=ThreadFunc(loop, (i, SleepLoops[i]),loop.__name__))
        threads.append(t)
    print("开始执行功能类")
    for i in n:        # start all threads
        threads[i].start()
    for i in n:        # wait for completion
        threads[i].join()
    print('all DONE at:', ctime())

if __name__ == '__main__':
    main()

#子类的实例---------------------------------------------------
import threading
from time import sleep, ctime
SleepLoops = [2, 1]
class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self, name=name)
        self.func = func
        self.args = args
    #类似于__call__(),这里必须要写成run()
    def run(self):
        self.func(*self.args)
def Func(nloop, nsec):
    print('start loop', nloop, 'at:', ctime())
    sleep(nsec)
    print('loop', nloop, 'done at:', ctime())

def main():
    print('starting at:', ctime())
    threads = []
    n = range(len(SleepLoops))
    for i in n:
        t = MyThread(Func, (i, SleepLoops[i]),Func.__name__)
        threads.append(t)
    print("开始执行子类")
    for i in n:
        threads[i].start()
    for i in n:
        threads[i].join()
    print('all DONE at:', ctime())

if __name__ == '__main__':
    main()

3.threading建立模块(MyThread.py)

import threading
from time import time, ctime

class MoreThread(threading.Thread):
    def __init__(self, func, args, name='', verb=False):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args
        self.verb = verb

    def getResult(self):
        return self.res

    def run(self):
        if self.verb:
            print('starting', self.name, 'at:', ctime())
        self.res = self.func(*self.args)
        if self.verb:
            print(self.name, 'finished at:', ctime())

4.单线程与多线程的比较

import MyThread
from time import ctime, sleep

def fib(x):
    sleep(0.005)
    if x < 2: return 1
    return (fib(x-2) + fib(x-1))
def fac(x):
    sleep(0.1)
    if x < 2: return 1
    return (x * fac(x-1))
def sum(x):
    sleep(0.1)
    if x < 2: return 1
    return (x + sum(x-1))

funcs = (fib, fac, sum)
n = 12

def main():
    nfuncs = range(len(funcs))

    print('*** SINGLE THREAD')
    for i in nfuncs:
        print('starting', funcs[i].__name__,'at:', ctime())
        print(funcs[i](n))
        print(funcs[i].__name__, 'finished at:',ctime())

    print('
*** MULTIPLE THREADS')
    threads = []
    for i in nfuncs:
        t = MyThread.MoreThread(funcs[i], (n,),funcs[i].__name__)
        threads.append(t)
    for i in nfuncs:
        threads[i].start()
    for i in nfuncs:
        threads[i].join()
        print(threads[i].getResult())

    print('all DONE')

if __name__ == '__main__':
    main()
View Code

5.锁Lock与可重复锁RLock

#锁能够防止多个线程同时进入公共临界区
import threading,time
num = 0
#创建lock对象,这句必须要有,指向一个锁。且只能acquire()一次------------------------------------
lock = threading.Lock()
def run(n):
    #如果不加入锁,print的内容会混乱
    lock.acquire() #获得锁,锁住状态。每次只能有一个线程访问
    global num
    print("first",n,num)
    num +=1
    print("second",n,num)
    time.sleep(0.01)
    lock.release() #释放锁,没有锁状态。排队的进程可以访问。
    #或者用with语句也可以
    with lock:
        print("third", n, num)
        num += 1
        print("fourth", n, num)
        time.sleep(0.01)
t_objs = [] #存线程实例
for i in range(50):
    t = threading.Thread(target=run,args=(i,))
    t.start()
    t_objs.append(t) #为了不阻塞后面线程的启动,不在这里join,先放到一个列表里
for t in t_objs: #循环线程实例列表,等待所有线程执行完毕
    t.join()
print("----------all threads has finished...",threading.current_thread(),threading.active_count())
print("num:",num)

#----------------------------------------------------------------------------------------------
import threading, time
num, num2 = 0, 0
Relock = threading.RLock() #RLock()可以多次上锁,且不会阻塞----------------------------------------
def run1():
    print("grab the first part data")
    Relock.acquire()
    global num
    num += 1
    Relock.release()
    return num

def run2():
    print("grab the second part data")
    Relock.acquire()
    global num2
    num2 += 1
    Relock.release()
    return num2

def run3():
    Relock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    Relock.release()
    print(res, res2)

for i in range(1):
    t = threading.Thread(target=run3)
    t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

6.信号量

#对于有限资源的应用,使用信号量是个不错的选择
#threading.BoundedSemaphore()限制多线程的访问
import threading, time
# 最多允许5个线程同时运行:0-4运行完,才可以运行下一组
semaphore = threading.BoundedSemaphore(5)
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s
" % n)
    semaphore.release()
    with semaphore:
        time.sleep(1)
        print("run the thread: %s
" % n)

if __name__ == '__main__':
    for i in range(22):
        t = threading.Thread(target=run, args=(i,))
        t.start()
while threading.active_count() != 1:
    pass  # print(threading.active_count())
else:
    print('----all threads done---')
    #print(num)

7.队列queue

from time import sleep
from queue import Queue,PriorityQueue
from MyThread import MoreThread

def writer(queue, loops):
    for i in range(loops):
        print('producing object for Q...')
        queue.put('xxx', 1)
        print("size now", queue.qsize())

def reader(queue, loops):
    for i in range(loops):
        val = queue.get(1)
        print('consumed object from Q... size now', queue.qsize())

funcs = [writer, reader]
nfuncs = range(len(funcs))

def main():
    nloops = 5
    q = Queue(32) #建立一个先入先出队列
    threads = []
    for i in nfuncs:
        t = MoreThread(funcs[i], (q, nloops), funcs[i].__name__)
        threads.append(t)
    for i in nfuncs:
        threads[i].start()
    for i in nfuncs:
        threads[i].join()
    print('all DONE')

if __name__ == '__main__':
    main()

#优先级队列
q = PriorityQueue()
q.put((-1,"A"))
q.put((3,"B"))
q.put((10,"C"))
q.put((6,"D"))
print(q.get())
print(q.get())
print(q.get())
print(q.get())
原文地址:https://www.cnblogs.com/i201102053/p/10686659.html