C语言实现线程池

以前写过一篇关于如何使用多线程推升推送速度(http://www.cnblogs.com/bai-jimmy/p/5177433.html),能够到达5000qps,其实已经可以满足现在的业务,不过在看nginx的说明文档时,又提到nginx支持线程池来提升响应速度, 一直对如何实现线程池很感兴趣,利用周末的时间参考别人的代码,自己写了一个初级版,并且调通了,还没在实际开发中应用,不知道效果如何

代码如下:

pd_log.h

#ifndef __pd_log_
#define __pd_log_

#define LOG_DEBUG_PATH "debug.log"
#define LOG_ERROR_PATH "error.log"

/**
 * define log level
 */
enum log_level {
        DEBUG = 0,
        ERROR = 1 
};

#define error(...) 
        logger(ERROR, __LINE__, __VA_ARGS__)

#define debug(...) 
        logger(DEBUG, __LINE__, __VA_ARGS__)

#define assert(expr, rc) 
        if(!(expr)){    
                error(#expr"is null or 0");     
                return rc;      
        }
#endif

pd_log.c

#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <time.h>

#include "pd_log.h"

/**
 * get now timestr
 */
static void get_time(char *time_str, size_t len) {
    time_t tt;
    struct tm local_time;
    time(&tt);
    localtime_r(&tt, &local_time);
    strftime(time_str, len, "%m-%d %H:%M:%S", &local_time);
}

/**
 * log
 */
static void logger(int flag, int line, const char *fmt, ...) {
    FILE *fp = NULL;
    char time_str[20 + 1];
    va_list args;
    get_time(time_str, sizeof(time_str));

    switch (flag) {
        case DEBUG:
            fp = fopen(LOG_DEBUG_PATH, "a");
            if (!fp) {
                return;
            }
            fprintf(fp, "%s DEBUG (%d:%d) ", time_str, getpid(), line);
            break;
        case ERROR:
            fp = fopen(LOG_ERROR_PATH, "a");
            if (!fp) {
                return;
            }
            fprintf(fp, "%s ERROR (%d:%d) ", time_str, getpid(), line);
            break;
        default:
            return;
    }

    va_start(args, fmt);
    vfprintf(fp, fmt, args);
    va_end(args);
    fprintf(fp, "
");

    fclose(fp);
    return;
}

pd_pool.h

/**
 * 线程池头文件
 * @author jimmy
 * @date 2016-5-14
 */
#ifndef __PD_POOL_
#define __PD_POOL_

/*任务链表*/
typedef struct task_s{
        void (*routine)(void *);
        void *argv;
        struct task_s *next;
} pd_task_t;

/*任务队列*/
typedef struct queue_s{
        pd_task_t *head;
        pd_task_t **tail;
        size_t max_task_num;
        size_t cur_task_num;
}pd_queue_t;

/*线程池*/
typedef struct pool_s{
        pthread_mutex_t mutex;
        pthread_cond_t cond;
        pd_queue_t queue;
        size_t thread_num;
        //size_t thread_stack_size;
}pd_pool_t;

/*初始化线程池*/
//pd_pool_t *pd_pool_init(size_t thread_num, size_t thread_stack_size, size_t thread_max_num);
#endif

pd_poo.c

/**
 * 线程池
 * @author jimmy
 * @date 2016-5-14
 */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>

#include <pthread.h>

#include "pd_log.h"
#include "pd_log.c"
#include "pd_pool.h"

/*tsd*/
pthread_key_t key;

void *pd_worker_dispatch(void *argv){
        ushort exit_flag = 0;
        pd_task_t *a_task;
        pd_pool_t *a_pool = (pd_pool_t *)argv;
        if(pthread_setspecific(key, (void *)&exit_flag) != 0){
                return NULL;
        }
        /*动态从任务列表中获取任务执行*/
        while(!exit_flag){
                pthread_mutex_lock(&a_pool->mutex);
                /*如果此时任务链表为空,则需要等待条件变量为真*/
                while(a_pool->queue.head == NULL){
                        pthread_cond_wait(&a_pool->cond, &a_pool->mutex);
                }
                /*从任务链表中任务开支执行*/
                a_task = a_pool->queue.head;
                a_pool->queue.head = a_task->next;
                a_pool->queue.cur_task_num--;
                if(a_pool->queue.head == NULL){
                        a_pool->queue.tail = &a_pool->queue.head;
                }
                /*解锁*/
                pthread_mutex_unlock(&a_pool->mutex);
                /*执行任务*/
                a_task->routine(a_task->argv);
//core
                free(a_task);
                a_task = NULL;
        }
        pthread_exit(0);
}

/**
 * 根据线程数创建所有的线程
 */
static int pd_pool_create(pd_pool_t *a_pool){
        int i;
        pthread_t tid;
        for(i = 0; i < a_pool->thread_num; i++){
                pthread_create(&tid, NULL, pd_worker_dispatch, a_pool);
        }
        return 0;
}

/**
 * 线程退出函数
 */
void pd_pool_exit_cb(void *argv){
        unsigned int *lock = argv;
        ushort *exit_flag_ptr = pthread_getspecific(key);
        *exit_flag_ptr = 1;
        pthread_setspecific(key, (void *)exit_flag_ptr);
        *lock = 0;
}

/**
 * 线程池初始化
 */
pd_pool_t *pd_pool_init(size_t thread_num, size_t thread_max_num){
        pd_pool_t *a_pool = NULL;
        a_pool = calloc(1, sizeof(pd_pool_t));
        if(!a_pool){
                error("pool_init calloc fail: %s", strerror(errno));
                return NULL;
        }
        a_pool->thread_num = thread_num;
        //初始化队列参数
        a_pool->queue.max_task_num = thread_max_num;
        a_pool->queue.cur_task_num = 0;
        a_pool->queue.head = NULL;
        a_pool->queue.tail = &a_pool->queue.head;
        //初始化tsd
        if(pthread_key_create(&key, NULL) != 0){
                error("pthread_key_create fail: %s", strerror(errno));
                goto err;
        }
        //初始化互斥锁
        if(pthread_mutex_init(&a_pool->mutex, NULL) != 0){
                error("pthread_mutex_init fail: %s", strerror(errno));
                pthread_key_delete(key);
                goto err;
        }
        //初始化条件变量
        if(pthread_cond_init(&a_pool->cond, NULL) != 0){
                error("pthread_cond_init fail: %s", strerror(errno));
                pthread_mutex_destroy(&a_pool->mutex);
                goto err;
        }
        //创建线程池
        if(pd_pool_create(a_pool) != 0){
                error("pd_pool_create fail: %s", strerror(errno));
                pthread_mutex_unlock(&a_pool->mutex);
                pthread_cond_destroy(&a_pool->cond);
                goto err;
        }
        return a_pool;
err:
        free(a_pool);
        return NULL;
}

/**
 * 向线程池中添加任务..
 */
int pd_pool_add_task(pd_pool_t *a_pool, void (*routine)(void *), void *argv){
        pd_task_t *a_task = NULL;
        a_task = (pd_task_t *)calloc(1, sizeof(pd_task_t));
        if(!a_task){
                error("add task calloc faile: %s", strerror(errno));
                return -1;
        }
        a_task->routine = routine;
        a_task->argv = argv;
        a_task->next = NULL;
        /*加锁*/
        pthread_mutex_lock(&a_pool->mutex);
        if(a_pool->queue.cur_task_num >= a_pool->queue.max_task_num){
                error("cur_task_num >= max_task_num");
                goto err;
        }
        /*将任务放到末尾*/
        *(a_pool->queue.tail) = a_task;
        a_pool->queue.tail = &a_task->next;
        a_pool->queue.cur_task_num++;
        /*通知堵塞的线程*/
        pthread_cond_signal(&a_pool->cond);
        /*解锁*/
        pthread_mutex_unlock(&a_pool->mutex);
        return 0;
err:
        pthread_mutex_unlock(&a_pool->mutex);
        free(a_task);
        return -1;
}

void pd_pool_destroy(pd_pool_t *a_pool){
        unsigned int n;
        unsigned int lock;

        for(n = 0; n < a_pool->thread_num; n++){
                lock = 1;
                if(pd_pool_add_task(a_pool, pd_pool_exit_cb, &lock) != 0){
                        error("pd_pool_destroy fail: add_task fail");
                        return;
                }
                while(lock){
                        usleep(1);
                }
        }
        pthread_mutex_destroy(&a_pool->mutex);
        pthread_cond_destroy(&a_pool->cond);
        pthread_key_delete(key);
        free(a_pool);
}
/******************************************************************************************/

void testfun(void *argv){
        printf("testfun
");
        sleep(1);
}

int main(){
        pd_pool_t *a_pool = pd_pool_init(9, 5);

        pd_pool_add_task(a_pool, testfun, NULL);
        pd_pool_add_task(a_pool, testfun, NULL);
        pd_pool_add_task(a_pool, testfun, NULL);

        pd_pool_destroy(a_pool);
}
原文地址:https://www.cnblogs.com/bai-jimmy/p/5499147.html