写个线程池

#!/usr/bin/env python

#encoding=utf-8

import threading

import time

import datetime

import Queue

import logging

import sys,os,os.path

#def get_logger(level=logging.DEBUG):

#    logger = logging.getLogger()

#    hdlr2 =logging.StreamHandler(sys.stdout)

#    hdlr = logging.FileHandler(os.path.abspath(os.path.join(os.path.dirname(__file__),"../logs/crawler_agent.log")))

#    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')

#    hdlr.setFormatter(formatter)

#    logger.addHandler(hdlr)

#    logger.addHandler(hdlr2)

#    logger.setLevel(level)

#    return logger

#

#get_logger()

def f(a,b):

    time.sleep(10)

    print a,b

class Priority:

    important=-1

    default=0

class MyThread(threading.Thread):

    def __init__(self,queue,threads,lock):

        threading.Thread.__init__(self)

        self.queue=queue

        self.threads=threads

        #self.name=datetime.datetime.now()

        self.lock=lock

    def run(self):

        try:

            while True:

                priorityNO,f,args=self.queue.get(True,10)

                f(*args)

                logging.debug("finished one job")

        except Queue.Empty:

            print "empty exception"

            print "exit the thread"

        self._clear()

    def _clear(self):

        #print "wait 10?"

        #print "="*10

        print self.name +" leaving"

        self.lock.acquire()

        del self.threads[self.name]

        self.lock.release()

        print self.threads

#threads={"a":"z","b":"zz"}

#print threads

#q=Queue.PriorityQueue()

#q.put((Priority.default,f,("a","b")))

#q.put((Priority.important,f,("b","b")))

#t=MyThread(q,threads)

#t.start()

#print t.name

#t2=MyThread(q,threads)

#print t2.name

#time.sleep(20)

#print "="*10

#print threads

class ThreadPool():

    def __init__(self,max_size=1):

        self.max_size=max_size

        self.job_queue=Queue.PriorityQueue()

        self.threads={}

        self.lock=threading.RLock()

    def add_job(self,f,args,priority):

        self.job_queue.put((priority,f,args))

        l=len(self.threads)

        if l<self.job_queue.qsize()*2 and l<self.max_size:

            t=MyThread(self.job_queue,self.threads,self.lock)

            if t.name in self.threads:

                logging.error("duplicated key!!")

                raise Error

            else:

                self.threads[t.name]=t

                logging.debug("start new thread %s"%t.name)

                t.start()

if __name__ == '__main__':

    pool=ThreadPool(2)

    pool.add_job(f,(1,2),1)

    pool.add_job(f,(2,1),-1)

    pool.add_job(f,(4,4),-2)

原文地址:https://www.cnblogs.com/lexus/p/2498408.html