python线程

Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

import threading, time


def f1(arg,a):
    print(arg,a)
    time.sleep(2)
    print('老司机等等我')

f1(111, 'zxc')

for i in range(5):
    t = threading.Thread(target=f1, args=(123,'qwe',))
    t.setDaemon(True)   #为True表示主线程不等待子线程,默认为False
    t.start()
t.join(2)    #表示主线程执行到此处时,等待子线程,参数为等待超时时间,可为空
print('老司机开车到站了')
View Code

上述代码中创建了10个线程,然后控制器就交给了CPU,CPU根据制定算法进行调度,分片执行指令。
更多方法:

  start:  线程准备就绪,等待CPU调度

  setName:为线程设置名称

  getName:获取线程名称

  setDaemon:设置为后台线程或前台线程(默认为前台线程)。如果为后台线程,主线程执行完毕后,后台线程不论成功与否,均停止。如果为前台线程,主线程执行完毕后,等待前台线程也执行完毕,程序停止。

  join  :当每个线程执行完毕后继续往下执行,该方法使得多线程变得无意义

  run  :线程被CPU调度后自动执行线程对象的run方法。

import threading, time


def f1(arg,a):
    print(arg,a)
    time.sleep(2)
    print('老司机等等我')

f1(111, 'zxc')

for i in range(5):
    t = threading.Thread(target=f1, args=(123,'qwe',))
    t.setDaemon(True)   #为True表示主线程不等待子线程,默认为False
    t.start()
t.join(2)    #表示主线程执行到此处时,等待子线程,参数为等待超时时间,可为空
print('老司机开车到站了')
示例

线程锁:

由于线程之间是进行随机调度,并且每个线程可能执行n条之后,当多个线程同事修改同一条数据时可能会出现脏数据,所以,出现了线程锁,即同一时刻允许一个线程执行操作。

import threading, time

NUM = 10


def func(l):
    global NUM
    l.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM)
    l.release()
lock = threading.RLock()

for i in range(9):
    t = threading.Thread(target=func, args=(lock,))
    t.start()
普通锁

在上述代码中如果不使用线程锁,那么输出结果将全部是1.

信号量(Semaphore):

互斥锁同时只允许一个线程更改数据,而信号量同事允许一定数量的线程更改数据。

NUM = 10


def func(i, l):
    global NUM
    l.acquire()
    NUM -= 1
    #time.sleep(2)
    print(i, NUM)
    l.release()
lock = threading.BoundedSemaphore(5)    #最多允许5个线程同时运行

for i in range(30):
    t = threading.Thread(target=func, args=(i,lock,))
    t.start()
信号量

事件(event):

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法:set、wait、clear。事件处理的机制:全局定义了一个flag,如果flag为False,那么执行程序执行event.wait方法时就会阻塞,如果flag值为True,那么event.wait方法就不阻塞。

def func(i, e):
    print(i)
    e.wait()         #检测event是什么状态,如果是clear状态则在此等待
    print(i+100)

event = threading.Event()

for i in range(30):
    t = threading.Thread(target=func, args=(i, event,))
    t.start()

event.clear()       #表示锁住线程
inp = input(">>:")
if inp:
    event.set()     #表示解锁线程
事件


条件(Condition)

使得线程等待,只有满足条件时,才释放n个线程

def func(i, con):
    print(i)
    con.acquire()
    con.wait()
    print(i+10)
    con.release()

c = threading.Condition()

for i in range(10):
    t = threading.Thread(target=func, args=(i,c,))
    t.start()

while True:
    inp = input('>>:')
    if inp == 'q':break
    c.acquire()
    c.notify(int(inp))
    c.release()
条件
def condition():
    ret = False
    inp = input('>>:')
    if inp:ret = True
    return ret


def func(i, con):
    print(i)
    con.acquire()
    con.wait_for(condition)
    print(i+100)
    con.release()

c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=func, args=(i, c,))
    t.start()
条件2

Timer:

定时器,指定n秒后执行某操作

from threading import Timer

def hello():
    print('hello')

t = Timer(1, hello)
t.start()
Timer

线程池:

 自定义线程池:

import queue, threading, time


class ThreadPool(object):
    def __init__(self, maxsize=5):
        self.maxsize = maxsize
        self._q = queue.Queue(maxsize)
        for i in range(maxsize):
            self._q.put(threading.Thread)

    def get_thread(self):
        return self._q.get()

    def put_thread(self):
        self._q.put(threading.Thread)

pool = ThreadPool(5)


def task(arg, p):
    print(arg)
    time.sleep(1)
    p.put_thread()

for i in range(20):
    t = pool.get_thread()
    obj = t(target=task, args=(i, pool))
    obj.start()
  1 import time, threading, queue, contextlib
  2 
  3 StopEvent = object()
  4 
  5 
  6 class ThreadPool(object):
  7     def __init__(self, max_num, max_task_num=None):
  8         if max_task_num:
  9             self.q = queue.Queue(max_task_num)
 10         else:
 11             self.q = queue.Queue()
 12             self.max_num = max_num
 13             self.cancel = False
 14             self.terminal = False
 15             self.generate_list = []     # 当前已经创建了多少线程
 16             self.free_list = []         # 当前空闲线程
 17 
 18     def run(self, func, args, callback=None):
 19         """
 20         线程池执行一个任务
 21         :param func: 任务函数
 22         :param args: 任务函数所需参数
 23         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数
 24         :1、任务函数执行状态;2、任务函数返回值(默认为None,即不执行回调函数)
 25         :return: 如果线程池已经终止,则返回True,否则返回None
 26         """
 27         if self.cancel:
 28             return
 29         # 判断已经创建的线程是否小于最大线程数和空闲的线程存在与否
 30         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 31             self.generate_thread()  # 创建线程
 32         w = (func, args, callback,)
 33         self.q.put(w)   # 将任务放到队列里
 34 
 35     def generate_thread(self):
 36         """
 37         创建一个线程 
 38         """
 39         t = threading.Thread(target=self.call)
 40         t.start()
 41 
 42     def call(self):
 43         """
 44         循环去获取任务函数并执行函数 
 45         """
 46         current_thread = threading.currentThread
 47         self.generate_list.append(current_thread)
 48 
 49         event = self.q.get()
 50         while event != StopEvent:
 51             func, arguments, callback = event
 52             try:
 53                 result = func(*arguments)
 54                 success = True
 55             except Exception as e:
 56                 success = False
 57                 result = None
 58 
 59             if callback is not None:
 60                 try:
 61                     callback(success, result)
 62                 except Exception as e:
 63                     pass
 64             # 执行完以上函数后,即任务已经执行完,将free_list添加线程
 65             with self.worker_state(self.free_list, current_thread):
 66                 if self.terminal:
 67                     event = StopEvent
 68                 else:
 69                     event = self.q.get()
 70         else:
 71             self.generate_list.remove(current_thread)
 72 
 73     def close(self):
 74         """
 75         执行完所有的任务后,所有线程停止
 76         """
 77         self.cancel = True
 78         full_size = len(self.generate_list)
 79         while full_size:
 80             self.q.put(StopEvent)
 81             full_size -= 1
 82 
 83     def terminate(self):
 84         """
 85         无论是否还有任务,终止线程
 86         """
 87         self.terminal = True
 88         while self.generate_list:
 89             self.q.put(StopEvent)
 90         self.q.empty()
 91 
 92     @contextlib.contextmanager
 93     def worker_state(self, state_list, worker_thread):
 94         """
 95         用于记录线程中正在等待的线程数
 96         """
 97         state_list.append(worker_thread)
 98         try:
 99             yield
100         finally:
101             state_list.remove(worker_thread)
102 
103 pool = ThreadPool(5)
104 
105 
106 def callback(status, result):
107     pass
108 
109 
110 def action(i):
111     print(i,'hhee')
112 
113 for i in range(30):
114     pool.run(action, (i,), callback)
115 
116 
117 time.sleep(5)
118 print(len(pool.generate_list), len(pool.free_list))
119 print(len(pool.generate_list), len(pool.free_list))
120 pool.close()
121 pool.terminate()
高级线程池
原文地址:https://www.cnblogs.com/caibao666/p/6763446.html