C实现线程池

简介:这里使用linux下的互斥锁和条件变量实现了一个线程池。代码由一个未知作者完成,第二任作者补充优化。

本人仅仅是做了一些注释工作。

代码如下: 

/*! .h */

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <pthread.h>
#include <signal.h>
#include <errno.h>

#ifndef TPBOOL
typedef int TPBOOL;
#endif

#ifndef TRUE
#define TRUE 1
#endif

#ifndef FALSE
#define FALSE 0
#endif

// 作为一个判断是idle(normal)或busy状态的阀值
#define BUSY_THRESHOLD 0.5  // (busy thread)/(all thread threshold)
#define MANAGE_INTERVAL 5   // tp manage thread sleep interval 主线程管理线程池间隔

typedef void *tp_work_desc;
typedef void *(*tp_work)(void *); // tp_work指向参数和返回值类型都是void*的函数

typedef struct tp_thread_info_s tp_thread_info;
typedef struct tp_thread_pool_s tp_thread_pool;

// thread info
struct tp_thread_info_s
{
    pthread_t       thread_id;  // thread id num
    TPBOOL          is_busy;    // thread status:true-busy;flase-idle
    pthread_cond_t  thread_cond;
    pthread_mutex_t thread_lock; // 对某一个线程的信息进行互斥访问
    tp_work         th_work;
    tp_work_desc    th_job;
    TPBOOL          exit;
    TPBOOL          is_wait; // CAUTION:在未调用pthread_cond_wait时通过pthread_cond_signal发送信号会造成信号丢失!
};

// main thread pool struct
struct tp_thread_pool_s
{
    TPBOOL (*init)(tp_thread_pool *this);
    void (*close)(tp_thread_pool *this);
    void (*process_job)(tp_thread_pool *this, tp_work worker, tp_work_desc job);
    int  (*get_thread_by_id)(tp_thread_pool *this, pthread_t id);
    TPBOOL (*add_thread)(tp_thread_pool *this);
    TPBOOL (*delete_thread)(tp_thread_pool *this);
    int (*get_tp_status)(tp_thread_pool *this);

    int min_th_num;     // min thread number in the pool
    int cur_th_num;     // current thread number in the pool
    int max_th_num;         // max thread number in the pool
    pthread_mutex_t tp_lock; 
    pthread_t manage_thread_id; // manage thread id num
    tp_thread_info *thread_info;    // work thread relative thread info
};

tp_thread_pool *creat_thread_pool(int min_num, int max_num);
View Code
#include "thread-pool.h"

// #define TEST_LIB

#define DBG_OUT(args...) 
do{ 
    char b__[1024]; 
    sprintf(b__,args); 
    fprintf(stderr,"%u:[%s,%d] %s",(unsigned long)time(NULL),__FUNCTION__,__LINE__,b__); 
}while(0)

static void *tp_work_thread(void *pthread);
static void *tp_manage_thread(void *pthread);

static TPBOOL tp_init(tp_thread_pool *this);
static void tp_close(tp_thread_pool *this);
static void tp_process_job(tp_thread_pool *this, tp_work worker, tp_work_desc job);
static int  tp_get_thread_by_id(tp_thread_pool *this, pthread_t id);
static TPBOOL tp_add_thread(tp_thread_pool *this);
static TPBOOL tp_delete_thread(tp_thread_pool *this);
static int  tp_get_tp_status(tp_thread_pool *this);

/**
  * user interface. creat thread pool.
  * para:
  *     num: min thread number to be created in the pool
  * return:
  *     thread pool struct instance be created successfully
  */
tp_thread_pool *creat_thread_pool(int min_num, int max_num)
{
    tp_thread_pool *this;
    this = (tp_thread_pool *)malloc(sizeof(tp_thread_pool));

    // init member function ponter
    this->init = tp_init;
    this->close = tp_close;
    this->process_job = tp_process_job;
    this->get_thread_by_id = tp_get_thread_by_id;
    this->add_thread = tp_add_thread;
    this->delete_thread = tp_delete_thread;
    this->get_tp_status = tp_get_tp_status;

    // init member var
    this->min_th_num = min_num;
    this->cur_th_num = this->min_th_num; // 初始化时将当前线程数目设置为最小线程数目
    this->max_th_num = max_num;
    pthread_mutex_init(&this->tp_lock, NULL); // 以默认方式初始化锁

    // malloc mem for num thread info struct
    // 开辟存储max_num个线程信息的空间,即最大所需空间
    this->thread_info = (tp_thread_info *)malloc(sizeof(tp_thread_info) * this->max_th_num);

    return this;
}


/**
  * member function reality. thread pool init function.
  * para:
  *     this: thread pool struct instance ponter
  * return:
  *     true: successful; false: failed
  */
TPBOOL tp_init(tp_thread_pool *this)
{
    int i, num = this->min_th_num;
    int err;

    // creat work thread and init work thread info
    for(i = 0; i < num; i++)
    {
        // 初始化每个线程中的条件变量和信号量
        pthread_cond_init(&this->thread_info[i].thread_cond, NULL);
        pthread_mutex_init(&this->thread_info[i].thread_lock, NULL);

        // 函数原型如下
        // int pthread_create (pthread_t * newthread,    /*! 新线程ID */
        //             const pthread_attr_t * attr,        /*! 设置新线程属性 */
        //            void *(*start_routine) (void *),    /*! 新线程开始执行函数 */
        //             void *arg)                            /*! 新线程执行函数的参数 */      
        err = pthread_create(&this->thread_info[i].thread_id, NULL, tp_work_thread, &this->thread_info[i]);
        if(0 != err)
        {
            DBG_OUT("tp_init: creat work thread failed
");
            return FALSE;
        }
        DBG_OUT("tp_init: creat work thread 0x%X
", this->thread_info[i].thread_id); // 打印工作线程线程号
    }

    // creat manage thread
    // 管理线程不在min_num之列
    err = pthread_create(&this->manage_thread_id, NULL, tp_manage_thread, this);
    if(0 != err)
    {
        DBG_OUT("tp_init: creat manage thread failed
");
        return FALSE;
    }
    DBG_OUT("tp_init: creat manage thread 0x%X
", this->manage_thread_id); // 打印管理线程线程号

    return TRUE;
}

/**
  * member function reality. thread pool entirely close function.
  * para:
  *     this: thread pool struct instance ponter
  * return:
  */
void tp_close(tp_thread_pool *this)
{
    int i;
    void *status;

    // close work thread
    for(i = 0; i < this->cur_th_num; i++)
    {
        // int pthread_kill(pthread_t thread,int signal);
        // 向thread线程发送signal信号,thread线程中有对应signal的信号处理函数
        // signal = 0时,用于测试线程是否存在
        if(pthread_kill(this->thread_info[i].thread_id, 0) != ESRCH) // 若线程存在
        {
            pthread_kill(this->thread_info[i].thread_id, SIGQUIT); // 向线程发送退出信号
            pthread_join(this->thread_info[i].thread_id, &status); // 以阻塞方式等待线程退出,退出状态保存在status中
            // 销毁线程信息中的互斥锁和信号量
            pthread_mutex_destroy(&this->thread_info[i].thread_lock);
            pthread_cond_destroy(&this->thread_info[i].thread_cond);
            DBG_OUT("tp_close: kill work thread 0x%X
", this->thread_info[i].thread_id);
        }
    }

    // free manage thread
    if(pthread_kill(this->manage_thread_id, 0) != ESRCH)
    {
        // close manage thread
        pthread_kill(this->manage_thread_id, SIGQUIT);
        pthread_join(this->manage_thread_id, &status);
        pthread_mutex_destroy(&this->tp_lock);
        DBG_OUT("tp_close: kill manage thread 0x%X
", this->manage_thread_id);
    }

    // free thread struct
    free(this->thread_info);
}

/**
  * member function reality. main interface opened.
  * after getting own worker and job, user may use the function to process the task.
  * para:
  *     this: thread pool struct instance ponter
  * worker: user task reality.
  * job: user task para
  * return:
  */

// 偶尔还会出现信号丢失!检查is_wait时加上锁,锁成功时
// 一定是进入了pthread_cond_wait。
#define TP_THREAD_IS_WAIT(idx) 
do 
{ 
    while(1) 
    { 
        pthread_mutex_lock(&this->thread_info[idx].thread_lock); 
        if(this->thread_info[idx].is_wait) 
        { 
            pthread_mutex_unlock(&this->thread_info[idx].thread_lock); 
            break; 
        } 
        pthread_mutex_unlock(&this->thread_info[idx].thread_lock); 
        sleep(1); 
    } 
}while(0)

void tp_process_job(tp_thread_pool *this, tp_work worker, tp_work_desc job)
{
    int i;
    int tmpid;
    TPBOOL res;

    // fill this->thread_info's relative work key
    for(i = 0; i < this->cur_th_num; i++)
    {
        pthread_mutex_lock(&this->thread_info[i].thread_lock);
        if(!this->thread_info[i].is_busy) // 当前空闲线程
        {
            //DBG_OUT("tp_process_job: %d thread idle, thread id is %d
", i, this->thread_info[i].thread_id);
            // thread state be set busy before work
            this->thread_info[i].is_busy = TRUE;
            pthread_mutex_unlock(&this->thread_info[i].thread_lock);

            this->thread_info[i].th_work = worker;
            this->thread_info[i].th_job = job;

            //DBG_OUT("tp_process_job: informing idle working thread %d, thread id is %d
", i, this->thread_info[i].thread_id);
            /*!
             * Note: 空闲线程必须处在pthread_cond_wait时,发送信号才有作用,否则会出现信号丢失;
             * 这里可用信号量来代替
             */
            TP_THREAD_IS_WAIT(i); 
            pthread_cond_signal(&this->thread_info[i].thread_cond); // 激活空闲线程去执行任务
            return;
        }
        else
        {
            pthread_mutex_unlock(&this->thread_info[i].thread_lock);
        }
    }// end of for

    // if all current thread are busy, new thread is created here
    pthread_mutex_lock(&this->tp_lock); // 这里需要对管理线程加锁
    if( res = this->add_thread(this) )
    {
        i = this->cur_th_num - 1; // 新创建的空闲线程
        tmpid = this->thread_info[i].thread_id;
        this->thread_info[i].th_work = worker;
        this->thread_info[i].th_job = job;
    }
    pthread_mutex_unlock(&this->tp_lock);

    if (res) // 新的线程创建成功可以去执行当前任务
    {
        TP_THREAD_IS_WAIT(i);
        pthread_cond_signal(&this->thread_info[i].thread_cond);
    }

    return;
}

/**
  * member function reality. get real thread by thread id num.
  * para:
  *     this: thread pool struct instance ponter
  * id: thread id num
  * return:
  *     seq num in thread info struct array
  *     线程id在线程数组总的位置,返回其下标
  */
int tp_get_thread_by_id(tp_thread_pool *this, pthread_t id)
{
    int i;

    for(i = 0; i < this->cur_th_num; i++)
    {
        if(id == this->thread_info[i].thread_id)
        {
            return i;
        }
    }

    return -1;
}

/**
  * member function reality. add new thread into the pool.
  * para:
  *     this: thread pool struct instance ponter
  * return:
  *     true: successful; false: failed
  */
static TPBOOL tp_add_thread(tp_thread_pool *this)
{
    int err;
    tp_thread_info *new_thread;

    if( this->max_th_num <= this->cur_th_num ) // 当前线程已达到最大容量
    {
        DBG_OUT("Thread pool full 
");
        return FALSE;
    }

    // malloc new thread info struct
    new_thread = &this->thread_info[this->cur_th_num];

    // init new thread's cond & mutex
    pthread_cond_init(&new_thread->thread_cond, NULL);
    pthread_mutex_init(&new_thread->thread_lock, NULL);

    // NOTICE: init status is busy
    new_thread->is_busy = TRUE;
    new_thread->exit = FALSE;
    new_thread->is_wait = FALSE;

    err = pthread_create(&new_thread->thread_id, NULL, tp_work_thread, new_thread);
    if(0 != err)
    {
        pthread_mutex_destroy(&new_thread->thread_lock);
        pthread_cond_destroy(&new_thread->thread_cond);
        new_thread->is_busy = FALSE; // 创建结束之后才允许执行任务
        DBG_OUT("ERROR:Create thread.
");
        return FALSE;
    }

    //add current thread number in the pool.
    this->cur_th_num++;

    //DBG_OUT("Creat work thread %d;current threads number is %d.
", this->thread_info[this->cur_th_num-1].thread_id,this->cur_th_num);

    return TRUE;
}

/**
  * member function reality. delete idle thread in the pool.
  * only delete last idle thread in the pool.
  * 删除池中最后一个空闲线程
  * 判断当前线程中的最后一个线程是否空闲,空闲则删除,总线程数-1;不空闲则什么也不做;
  * 这样保证有效线程是连续的且位于数组的低位
  * para:
  *     this: thread pool struct instance ponter
  * return:
  *     true: successful; false: failed
  */
static TPBOOL tp_delete_thread(tp_thread_pool *this)
{
    void *status;
    int idx = this->cur_th_num - 1;
    TPBOOL res;

    // current thread num can't < min thread num
    if(this->cur_th_num <= this->min_th_num)
    {
        DBG_OUT("current thread num can't < min thread num
");
        return FALSE;
    }
    // check thread status
    pthread_mutex_lock(&this->thread_info[idx].thread_lock);
    // if last thread is busy, do nothing
    if(this->thread_info[idx].is_busy)
    {
        DBG_OUT("last thread is busy, do nothing.worker=%p,job=%p
", this->thread_info[idx].th_work, this->thread_info[idx].th_job);
        res = FALSE;
        pthread_mutex_unlock(&this->thread_info[idx].thread_lock);
    }
    else
    {
        this->thread_info[idx].is_busy = TRUE; // 待删除线程不再接受任务

        // 先锁定 tp_lock防止cur_th_num出错
        pthread_mutex_lock(&this->tp_lock); // 锁定管理线程,防止当前线程池中的状态发生改变
        pthread_mutex_unlock(&this->thread_info[idx].thread_lock); // 已锁定管理线程,其他线程则无法访问线程池
        //after deleting idle thread, current thread num -1
        this->cur_th_num--;

        //kill the idle thread and free info struct
        this->thread_info[idx].exit = 1;
        pthread_cond_signal(&this->thread_info[idx].thread_cond);
        pthread_join(this->thread_info[idx].thread_id, &status);

        pthread_mutex_destroy(&this->thread_info[idx].thread_lock);
        pthread_cond_destroy(&this->thread_info[idx].thread_cond);
        DBG_OUT("Delete thread.index = %d
", idx);
        pthread_mutex_unlock(&this->tp_lock);
        res = TRUE;
    }

    return res;
}

/**
  * member function reality. get current thread pool status:idle, normal, busy, .etc.
  * para:
  *     this: thread pool struct instance ponter
  * return:
  *     0: idle; 1: normal or busy(don't process)
  */
static int  tp_get_tp_status(tp_thread_pool *this)
{
    float busy_num = 0.0;
    int i;

    //get busy thread number
    for(i = 0; i < this->cur_th_num; i++)
    {
        if(this->thread_info[i].is_busy)
        {
            busy_num++;
        }
    }

    // 0.2? or other num?
    busy_num = busy_num / (this->cur_th_num); // 求的百分比

    DBG_OUT("Thread pool busy status = %f.Current thread number = %d
", busy_num, this->cur_th_num);

    if(busy_num < BUSY_THRESHOLD)
    {
        return 0;//idle status
    }
    else
    {
        return 1;//busy or normal status
    }
}

// 这个函数只是为了消除编译器警告
void *tp_thread_exit()
{
    pthread_exit(NULL);
}
void handle_quit(int signo)
{
    pthread_t curid;//current thread id

    // get current thread id
    curid = pthread_self();

    DBG_OUT("Handle sig %d,thread id = 0x%X 
", signo, curid);
    tp_thread_exit();
}

/**
  * internal interface. real work thread.
  * para:
  *     pthread: thread pool struct ponter
  * return:
  */
static void *tp_work_thread(void *pthread)
{
    tp_thread_info *th = (tp_thread_info *)pthread; // main thread pool struct instance

    signal(SIGQUIT, handle_quit); // 注册SIGQUIT对应的消息处理函数

    // wait cond for processing real job.
    while( TRUE )
    {
        pthread_mutex_lock(&th->thread_lock);
        th->is_wait = TRUE;
        pthread_cond_wait(&th->thread_cond, &th->thread_lock); // 等待real job发送条件信号
        th->is_wait = FALSE;
        pthread_mutex_unlock(&th->thread_lock);

        //DBG_OUT("%d thread do work!
", pthread_self());

        if(NULL != th->th_work)
        {
            th->th_work(th->th_job);
        }

        // thread state be set idle after work
        pthread_mutex_lock(&th->thread_lock);
        th->is_busy = FALSE;
        th->th_work = NULL;
        pthread_mutex_unlock(&th->thread_lock);

        if(th->exit)
        {
            return;
        }
        //DBG_OUT("%d thread do work over!,nseq = %d
", pthread_self(),nseq);
    }
}

/**
  * internal interface. manage thread pool to delete idle thread.
  * para:
  *     pthread: thread pool struct ponter
  * return:
  */
static void *tp_manage_thread(void *pthread)
{
    tp_thread_pool *this = (tp_thread_pool *)pthread; //main thread pool struct instance

    signal(SIGQUIT, handle_quit );

    sleep(MANAGE_INTERVAL);

    do
    {
        while( this->get_tp_status(this) == 0 ) // 空闲态
        {
            // 如果当前线程池中的最后一个线程是空闲的则删除,否则一直循环等待最后一个线程为空闲
            if( !this->delete_thread(this) )
            {
                break;
            }
        }
        sleep(MANAGE_INTERVAL);
    }
    while(TRUE);
}

// 这是第2作者的测试用例
#ifdef TEST_LIB
void *thread_fun(void *param)
{
    int i;
    pthread_t curid;//current thread id

    //get current thread id
    curid = pthread_self();
    for(i = 0; i < 100; i++)
    {
        DBG_OUT("i=%d,thread id = 0x%X,param = %d
", i, curid, (int)param);
        sleep(1);
    }
    return NULL;
}

tp_thread_pool *g_threadpool;
int main(int argc, char *argv[])
{

    g_threadpool = creat_thread_pool(3, 10);
    g_threadpool->init(g_threadpool);

    g_threadpool->process_job(g_threadpool, thread_fun, (void *)1);
    sleep(1);
    g_threadpool->process_job(g_threadpool, thread_fun, (void *)2);
    sleep(1);
    g_threadpool->process_job(g_threadpool, thread_fun, (void *)3);
    sleep(1);
    g_threadpool->process_job(g_threadpool, thread_fun, (void *)4);

    sleep(10);
    g_threadpool->close(g_threadpool);
    while(1)
    {
        sleep(2);
    }
}
#endif
View Code


这里提取主要的线程执行添加任务时的互斥操作。

/**
 *
 * 对工作线程的互斥同步操作
 * 线程创建完成后:
 * is_busy = FALSE; is_exit = FALSE; is_wait = FALSE;
 */
/*! 给线程添加任务 */
pthread_mutex_lock(thread_lock);
if(!is_busy)
{
    is_busy = TRUE;
    pthread_mutex_unlock(thread_lock);
    TP_THREAD_IS_WAIT(idx); // 保证信号不丢失
    pthread_cond_signal(thread_cond);
}
else
{
    pthread_mutex_unlock(thread_lock);
}

/*! 线程任务添加及执行 */
// 线程等待任务添加
while(1)
{
    pthread_mutex_lock(thread_lock);
    is_wait = TRUE;
    pthread_cond_wait(thread_cond, thread_lock);
    is_wait = FALSE;
    pthread_mutex_unlock(thread_lock);
    // 开始执行任务
    // ————————————
    // 任务执行完成
    pthread_mutex_lock(thread_lock);
    is_busy = FALSE;
    pthread_mutex_unlock(thread_lock);

    if(is_exit) break;
}

由于当任务执行完成之后,is_busy = false,添加任务操作便可以发出条件信号,而此时等待任务添加操作并不一定会处于条件等待位置,这样将会丢失信号。
所以第二任作者便写了一个TP_THREAD_IS_WAIT宏来进行判断。TP_THREAD_IS_WAIT宏实现如下:

#define TP_THREAD_IS_WAIT(idx) 
do 
{ 
    while(1) 
    { 
        pthread_mutex_lock(thread_lock); 
        if(tis_wait) 
        { 
            pthread_mutex_unlock(thread_lock); 
            break; 
        } 
        pthread_mutex_unlock(thread_lock); 
        sleep(1); 
    } 
}while(0);

总之,这里实现的线程池麻雀虽小,五脏俱全,可以作为学习互斥锁和条件变量之用。

原文地址:https://www.cnblogs.com/fengkang1008/p/4850750.html