python之路(13)线程

前言

  对于python来说,因为有DIL锁的存在,在同一个进程中,一个cpu中同一时刻只能运行一个线程,无法并行,只能并发,但是python可以创建多个进程,每个进程可以分别占用一个cpu来运行线程,实现并行,但是创建多个进程很占用内存。

  并发:可以运行多个程序,不是同时进行进行

  并行:可以同时运行多个程序

  同步和异步:同步是一个任务遇到io等阻塞,一直等待用户完成操作,再执行下面的程序;异步如果遇到io等阻塞,先执行下面的程序,一旦等到用户完成操作,在回来执行之前的程序。

目录


线程创建和常用方法

 方法一:

import time

def Hi(name):
    print('你好:%s'%name)
    time.sleep(2) #睡两秒

if __name__ == '__main__':
    #创建了两个子线程
    # target是目标函数,args是函数的参数(以元组的形式传入)
    t1=threading.Thread(target=Hi,args=("chen",)) 
    t2=threading.Thread(target=Hi,args=('liu',))

    t1.start()
    t2.start()

 方法二:通过继承

import threading
import time

#通过类继承
class MyThread(threading.Thread):
     def __init__(self,name):
         threading.Thread.__init__(self)
         self.name = name

     def run(self): #开启线程后,默认执行run方法
         print("你好:%s
"%self.name)
         time.sleep(3)

if __name__=='__main__':
    t1 = MyThread('chen')
    t2 = MyThread('liu')
    t1.start()
    t2.start()
    print('endind....')

 join():一旦子线调用了这个方法,主线程需要等子线程运行完了再运行主线程,不让主线程和子线程是同时运行的。

setDaemon(True):一旦子线程调用这个方法,就将子线程设置为守护线程(随着主线程的结束而结束)。 注:要在start()之前调用

 同步锁

  当不同线程同时使用一个全局变量的时候,就会出现数据混乱,同步锁的作用是,给线程的一段代码使用同步锁,在这段代码执行时,同一时刻只有一个线程在运行,也就是说加了同步锁的这段代码是串行的

import threading
import time

def sub():

    global num

    lock.acquire() #添加同步锁
    temp = num
    time.sleep(0.001)
    num = temp-1
    lock.release() #释放同步锁

num = 100
l = []

lock = threading.Lock() #创建同步锁

for i in range(100):
    t = threading.Thread(target=sub)
    t.start()
    l.append(t)

for t in l:
    t.join()

print(num) 

递归锁

  在同步锁中,会产生一个问题,如果在两个线程中,它们互相需要对方的资源,但是双方又不能先把资源给对方,这样就会一直卡住,这样就引入递归锁。

import  threading
import time


class MyThread(threading.Thread):

    def actionA(self):

        A.acquire() #请求A的同步锁
        print("%s gotA %s
"%(self.name,time.ctime()))
        time.sleep(2)

        B.acquire() #请求B的同步锁
        print("%s gotB %s
"%(self.name,time.ctime()))
        time.sleep(1)
        B.release() #释放A的同步锁

        A.release() #释放B的同步锁


    def actionB(self):

        B.acquire()#请求B的同步锁
        print("%s gotA %s
"%(self.name,time.ctime()))
        time.sleep(2)
        A.acquire() #请求A的同步锁
        print("%s gotB %s
"%(self.name,time.ctime()))
        time.sleep(1)
        A.release() #释放A的同步锁
        B.release() #释放B的同步锁


    def run(self):
        self.actionA()
        time.sleep(0.5)
        self.actionB()

if __name__ == '__main__':
    A=threading.Lock()
    B=threading.Lock()
    L=[]
    for i in range(5):
        t=MyThread()
        t.start()
        L.append(t)

    for i in L:
        i.join()

    print("ending....")

#######################################################
Thread-1 gotA Sat May  4 12:05:52 2019

Thread-1 gotB Sat May  4 12:05:54 2019

Thread-1 gotA Sat May  4 12:05:55 2019

Thread-2 gotA Sat May  4 12:05:55 2019

卡住......

  线程1首先完成actionA()方法,然后别的线程开始执行,也就是说,这个时候,线程2的actionA()方法和线程1的actionB()方法同时执行,然后线程1中的actionA()的方法拿到A的同步锁,此时线程2中的actionB()拿到B的同步锁,之后线程1请求B的同步锁,但是B锁已经被线程1给拿走了,同样的线程1请求A的同步锁,但是A锁已经被线程2给拿走了,此时两个线程都等对方先释放锁,就卡住 了。

因此使用递归锁

import  threading
import time


class MyThread(threading.Thread):

    def actionA(self):

        r_lcok.acquire() #count=1
        print(self.name,"gotA",time.ctime())
        time.sleep(2)
        r_lcok.acquire() #count=2
        print(self.name, "gotB", time.ctime())
        time.sleep(1)
        r_lcok.release() #count=1
        r_lcok.release() #count=0


    def actionB(self):

        r_lcok.acquire()
        print(self.name, "gotB", time.ctime())
        time.sleep(2)
        r_lcok.acquire()
        print(self.name, "gotA", time.ctime())
        time.sleep(1)
        r_lcok.release()
        r_lcok.release()

    def run(self):
        self.actionA()
        self.actionB()


if __name__ == '__main__':

    # A=threading.Lock()
    # B=threading.Lock()
    r_lcok=threading.RLock() #使用递归锁
    L=[]
    
    for i in range(5):
        t=MyThread()
        t.start()
        L.append(t)

    for i in L:
        i.join()

    print("ending....")  

递归锁里内部维护这一个计数器count,初始值为0,当请求锁的时候就加1,释放锁的时候就减1,只要计数器大于零,就没有别的线程可以拿到这把锁 

 同步对象(event)

  相当于一个线程标记(flag)

#创建event对象
event = threading.Event()
event.wait() #相当于一个阻塞状态,直到别的线程event.set()
event.set() #先当与告诉别的已经wait()的线程不用再阻塞了
event.clear() #清除event,用来重新wait()  

 

信号量

  设置同一时刻线程的执行数目

import threading,time

class myThread(threading.Thread):
    def run(self):

        if semaphore.acquire(): #设置信号量
            print(self.name)
            time.sleep(3)
            semaphore.release() #释放信号量

if __name__=="__main__":
    semaphore=threading.Semaphore(5) #设置5把锁,只有5个线程可以进入,同一时刻只能执行5个线程

    thrs=[]
    for i in range(100): #这里开启了100个子线程,如何不设置信号量,那100个子线程将同时执行
        thrs.append(myThread())
    for t in thrs:
        t.start()  

 线程队列

import queue      #  线程 队列

q=queue.Queue()  # 创建一个队列,可以设置队列里的最大存放个数

q.put(12)
q.put("hello")
q.put({"name":"yuan"}) #在put时,如果队列的值是满的,就会卡住,直到别的线程将值取走,队列里有空位了,才将值放进队列里

while 1:
    data=q.get() #如果这里的get()是里面没有值的话,就会一直卡住,直到比的线程往队列里面put()值
    print(data)
    print("----------")

  put('chen',False)  可以设置如果队列里是满的,就报错,同样 get(block=False) 也是一样

 q=queue.LifoQueue() 创建后进先出的队列,相当于栈

 q=queue.PriorityQueue() 创建按照优先级决定进出的队列  

import queue      #  线程 队列

q=queue.PriorityQueue()  # 创建一个优先级队列

q.put([3,12]) #列表带着优先级,数字越小,优先级越高
q.put([2,"hello"])
q.put([4,{"name":"yuan"}])

while 1:
    data=q.get()
    print(data[1])
    print("----------")
############################
hello
----------
12
----------
{'name': 'yuan'}
----------

 q.qsize() 返回队列的大小

 q.empty() 如果队列为空,返回true,如果队列不会空,返回false

 q.get_nowait() 相当于 q.get(block=False) ,不等待,直接报错

 q.task_done()  会给 q.join()发信号

 q.join() 等待接收来自 q.task_done() 的信号,不然一直卡住 

 生产者和消费者模型

  在多线程开发过程中,如果生产数据的线程处理的速度很快,但是消费数据的线程处理的很慢,那么就必须等到消费数据的线程处理完,才可以进行生产数据,反之,如果生产数据的处理速度小于消费数据的处理速度,那么消费线程就必须等待生产线程。

  因此生产者和消费者模型是在彼此之间不做任何通信,通过阻塞队列来进行通信,生产线程生产完数据后,直接扔给阻塞队列,而消费者直接从阻塞队列里区,阻塞队列就相当于一个缓冲区。(解耦问题)

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(5)
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    q.join()
    print("ok......")

def Consumer(name):
  count = 0
  while count <10:
        time.sleep(random.randrange(4))
    # if not q.empty():
    #     print("waiting.....")
        #q.join()
        data = q.get()
        print("eating....")
        time.sleep(4)

        q.task_done()
        #print(data)
        print('33[32;1mConsumer %s has eat %s baozi...33[0m' %(name, data))
    # else:
    #     print("-----no baozi anymore----")
        count +=1

p1 = threading.Thread(target=Producer, args=('A君',))
c1 = threading.Thread(target=Consumer, args=('B君',))
c2 = threading.Thread(target=Consumer, args=('C君',))
c3 = threading.Thread(target=Consumer, args=('D君',))

p1.start()
c1.start()
c2.start()
c3.start()
View Code
原文地址:https://www.cnblogs.com/shuzhixia/p/10806360.html