linux线程池

typedef struct task_node 
{
  void *arg;                    /* fun arg. */
  void *(*fun) (void *);        /* the real work of the task. */
  pthread_t tid;                /* which thread exec this task. */
  int work_id;                  /* task id. */
  int flag;                     /* 1: assigned, 0: unassigned. */
  struct task_node *next;        
  pthread_mutex_t mutex;        /* when modify this ds and exec the work,lock the task ds. */
} TASK_NODE;


/*
*the ds  of the task_queue
*/ 
typedef struct task_queue 
{
  pthread_mutex_t mutex;
  pthread_cond_t cond;          /* when no task, the manager thread wait for ;when a new task come, signal. */
  struct task_node *head;       /* point to the task_link. */
  int number;                   /* current number of task, include unassinged and assigned but no finished. */
} TASK_QUEUE_T;


/*
*the ds of every thread, make all thread in a double link queue.
*/ 
typedef struct pthread_node 
{
  pthread_t tid;               /* the pid of this thread in kernel,the value is  syscall return . */
  int flag;                    /*  1:busy, 0:free. */
  struct task_node *work;      /*  if exec a work, which work. */
  struct pthread_node *next;
  struct pthread_node *prev;
  pthread_cond_t cond;        /* when assigned a task, signal this child thread by manager. */
  pthread_mutex_t mutex; 
} THREAD_NODE;


/*
*the ds of the thread queue
*/ 
typedef struct pthread_queue 
{
  int number;                  /* the number of thread in this queue. */
  struct pthread_node *head;
  struct pthread_node *rear;
  pthread_cond_t cond;        /* when no idle thread, the manager wait for ,or when a thread return with idle, signal. */
  pthread_mutex_t mutex; 
} PTHREAD_QUEUE_T;

四大结构:

1、任务池struct,用于任务的管理,其中的head表示任务队列的第一个元素。内部的cond和mutex用于多线程操作此任务池

2、任务结点task_node,对应于相应的任务,这个任务结点中,包含任务对应的要执行的函数(此函数完成我们真正的socket数据交互等待,write或者read等),完成此任务对应的线程id(用于找到线程),任务分配与否的标志flag,以及操作此任务node的mutex和connd

3、线程池struct pthread_queue,用于管理线程,有线程的首指针和尾指针,线程的个数number,线程池在代码中包括空闲线程和工作线程两大类,分别用两个变量表示

4、线程struc pthread_node,对应线程。包括线程的id,工作状态,此线程要运行的任务task_node指针,用于找到任务,然后 struct pthread_node *next;struct pthread_node *prev形成一个线程结构的双向链表

主程序:

PTHREAD_QUEUE_T * pthread_queue_idle;    /* the idle thread double link queue. */     
PTHREAD_QUEUE_T *pthread_queue_busy;      /* the work thread double link queue. */
TASK_QUEUE_T *task_queue_head;            /* the task queuee single link list. */

int
main (int argc, char *argv[]) 
{
  pthread_t thread_manager_tid, task_manager_tid, monitor_id;
  
  init_system ();
 
  pthread_create (&thread_manager_tid, NULL, thread_manager, NULL);  /* create thread to manage the thread pool. */
  pthread_create (&task_manager_tid, NULL, task_manager, NULL);    /* create thread recive task from client. */
  pthread_create (&monitor_id, NULL, monitor, NULL);               /* create thread to monitor the system info. */
  
  pthread_join (thread_manager_tid, NULL);
  pthread_join (task_manager_tid, NULL);
  pthread_join (monitor_id, NULL);
  
  sys_clean ();
  
  return 0;
}

开了三个线程进行管理,

1、第一个线程是我们的线程分发和任务分发的thread_manager函数对应的线程。

pthread_create (&thread_manager_tid, NULL, thread_manager, NULL);

2、第二个线程是我们进行任务管理的线程,用于处理client新连接,对应产生的新任务,这个线程中,包含着socket创建,bind,listen,accept等过程,然后开始创建task_node结点,并初始化此节点,注册对应的任务工作函数fun,接下来的就是把此任务结点,加入到我们的任务池中,最后的步骤,就是通知我们运行thread_manger函数的线程,任务池有新的任务了,如果你堵塞在这个任务池的connd中,现在可以被唤醒了,接着我们的thread_manger函数对应的线程就可以继续工作了。

 pthread_create (&task_manager_tid, NULL, task_manager, NULL);

我们的thread_manger和task_manager函数的实现:其实thread_manger函数改名为thread_task_dispath更加准确

void *
thread_manager (void *ptr) 
{  
  while (1)
  {
    THREAD_NODE * temp_thread = NULL;      
    TASK_NODE * temp_task = NULL;      

        /*
         *get a new task, and modify the task_queue.
         *if no task block om task_queue_head->cond.
         */ 
    pthread_mutex_lock (&task_queue_head->mutex);      

    if (task_queue_head->number == 0)        
      pthread_cond_wait (&task_queue_head->cond,
                             &task_queue_head->mutex);      

    temp_task = task_queue_head->head;      
    task_queue_head->head = task_queue_head->head->next;      
    task_queue_head->number--;      

    pthread_mutex_unlock (&task_queue_head->mutex);
      

        /*
         *get a new idle thread, and modify the idle_queue.
         *if no idle thread, block on pthread_queue_idle->cond.
         */ 
        pthread_mutex_lock (&pthread_queue_idle->mutex);      

    if (pthread_queue_idle->number == 0)        
      pthread_cond_wait (&pthread_queue_idle->cond,
                            &pthread_queue_idle->mutex);      

    temp_thread = pthread_queue_idle->head;
      
        /*if this is the last idle thread ,modiry the head and rear pointor */ 
    if (pthread_queue_idle->head == pthread_queue_idle->rear)
    {
      pthread_queue_idle->head = NULL;
      pthread_queue_idle->rear = NULL;
    }      
        /*if idle thread number>2, get the first one,modify the head pointor  */ 
    else
    {
      pthread_queue_idle->head = pthread_queue_idle->head->next;
      pthread_queue_idle->head->prev = NULL;
    }
      
    pthread_queue_idle->number--;
      
    pthread_mutex_unlock (&pthread_queue_idle->mutex);      

        /*modify the  task attribute. */ 
    pthread_mutex_lock (&temp_task->mutex);
      
    temp_task->tid = temp_thread->tid;
    temp_task->next = NULL;
    temp_task->flag = 1;
      
    pthread_mutex_unlock (&temp_task->mutex);      

        /*modify the idle thread attribute. */ 
    pthread_mutex_lock (&temp_thread->mutex);
      
    temp_thread->flag = 1;
    temp_thread->work = temp_task;
    temp_thread->next = NULL;
    temp_thread->prev = NULL;
      
    pthread_mutex_unlock (&temp_thread->mutex);      

        /*add the thread assinged task to the busy queue. */ 
    pthread_mutex_lock (&pthread_queue_busy->mutex);      

        /*if this is the first one in busy queue */ 
    if (pthread_queue_busy->head == NULL)        
    {
       pthread_queue_busy->head = temp_thread;          
      pthread_queue_busy->rear = temp_thread;          
      temp_thread->prev = temp_thread->next = NULL;        
    }
    else        
    {          
            /*insert in thre front of the queue */ 
      pthread_queue_busy->head->prev = temp_thread;          
      temp_thread->prev = NULL;          
      temp_thread->next = pthread_queue_busy->head;          
      pthread_queue_busy->head = temp_thread;          
      pthread_queue_busy->number++;    
    }
    pthread_mutex_unlock (&pthread_queue_busy->mutex);      

        /*signal the child thread to exec the work */ 
    pthread_cond_signal (&temp_thread->cond);    
  }
}
View Code

thread_manger主要过程:

首先从任务池中得到一个任务,如果没有空闲任务,就堵塞,直到我们有新的client连接,产生了一个新的任务,并加入到我们的任务池中,此过程由task_manager对应的线程完成,得到任务之后,我们再通过得到线程池中的空闲线程,如果没有空闲线程就堵塞,如果有空闲线程,首先从空闲线程池中获取首个空闲线程(由双向链表组成的空闲线程),然后修改此空闲线程池。完成之后,修改之前得到的task的属性,包括下面的步骤: 

temp_task->tid = temp_thread->tid;
temp_task->next = NULL;
temp_task->flag = 1;

其中temp是我们得到的任务task_node,把得到的空闲线程的id赋值给我们的task中的tid字段,这样我们的task和我们的刚刚得到空闲线程,就通过tid联系在一起了,同时修改task中的其他字段,例如标志字段flag等等

修改了task属性之后,我们开始修改temp_thread结构(就是得到的空闲线程),包括以下操作:

temp_thread->flag = 1;
temp_thread->work = temp_task;
temp_thread->next = NULL;
temp_thread->prev = NULL;

同理,我们把thread对应的work字段,赋值我们刚才的task,这样在thread和task中,都建立了他们之间的联系。一个线程有自己的任务指针,一个任务同样有一个线程的tid。两者紧密联系起来。

最后我们修改我们的线程池thread_queue,这包括空闲线程池和工作线程池(因为我们声明了两个struct thread_queue的变量),第一个已经修改了空闲线程池,这一步我们修改工作线程池,把这个线程加入工作线程中

pthread_cond_signal (&temp_thread->cond);  //结尾我们通知,堵塞在这里的工作线程work_thread,你们可以开始工作了,因为上面的步骤,就是为工作线程争取任务队列,和空闲线程(上面的空闲线程,其实是空闲线程对应的结构体,但是可以通过或许他们,来表示获取线程,真正的线程其实是运行child_work函数的线程。)

关键点:通过使用thread_node来代表线程,这个结构体虽然不是线程,但是可以通过它,我们进行线程的操作和模拟过程

工作线程 pthread_create (&temp[i].tid, NULL, child_work, (void *) &temp[i]); 这里才是创建工作线程的地方

关键点:我们创建的pthread_node结点是不被free的,就是一旦在初始化函数中,这些结构体被创建,然后这些结构就一直存在,我们后续的线程,会改变这些thread_node结构的内容。

void *
child_work (void *ptr) 
{
  THREAD_NODE * self = (THREAD_NODE *) ptr;
  
  /*modify the tid attribute the first time exec */ 
  pthread_mutex_lock (&self->mutex);
  
  self->tid = syscall (SYS_gettid);
  
  pthread_mutex_unlock (&self->mutex);
  
  while (1)
  {
    pthread_mutex_lock (&self->mutex);
     
    /*if no task exec,blocked */ 
    if (NULL == self->work)
    {   
      pthread_cond_wait (&self->cond, &self->mutex);
    }
      
    pthread_mutex_lock (&self->work->mutex);
      
    /*execute the real work. */ 
    self->work->fun (self->work->arg);
      
    /*after finished the work */ 
    self->work->fun = NULL;
    self->work->flag = 0;
    self->work->tid = 0;
    self->work->next = NULL;
      
    free (self->work->arg);
      
    pthread_mutex_unlock (&self->work->mutex);  //unlock the task
    pthread_mutex_destroy (&self->work->mutex);
      
    /*free the task space */ 
    free (self->work);
      
    /*make self thread no work */ 
    self->work = NULL;
    self->flag = 0;
      
    pthread_mutex_lock (&task_queue_head->mutex);
      

        /*
         *get new task from the task_link if not NULL.
         *there no idle thread if there are task to do.
         *if on task ,make self idle and add to the idle queue.
         */ 
    if (task_queue_head->head != NULL)
    {
      TASK_NODE * temp = task_queue_head->head;
          
      /*get the first task */ 
      task_queue_head->head = task_queue_head->head->next;          

      /*modify self thread attribute */ 
      self->flag = 1;          
      self->work = temp;          
      temp->tid = self->tid;          
      temp->next = NULL;          
      temp->flag = 1;          

      task_queue_head->number--;          

      pthread_mutex_unlock (&task_queue_head->mutex);          

      pthread_mutex_unlock (&self->mutex);          

      continue;        
    }     
    else        
    {          
      /*no task need to exec, add self to idle queue and del from busy queue */ 
      pthread_mutex_unlock (&task_queue_head->mutex);        

      pthread_mutex_lock (&pthread_queue_busy->mutex);          

          /*self is the last execte thread */ 
      if (pthread_queue_busy->head == self
                && pthread_queue_busy->rear == self)
      {              
        pthread_queue_busy->head = pthread_queue_busy->rear = NULL;              
        self->next = self->prev = NULL;    
      }
          
          /*the first one thread in busy queue */ 
      else if (pthread_queue_busy->head == self
                     && pthread_queue_busy->rear != self)
      {              
        pthread_queue_busy->head = pthread_queue_busy->head->next;              
        pthread_queue_busy->head->prev = NULL;
              
        self->next = self->prev = NULL;            
      }
          
            /*the last one thread in busy queue */ 
      else if (pthread_queue_busy->head != self
                     && pthread_queue_busy->rear == self)            
      {              
        pthread_queue_busy->rear = pthread_queue_busy->rear->prev;              
        pthread_queue_busy->rear->next = NULL;
           
        self->next = self->prev = NULL;            
      }
          
      /*middle one */ 
      else            
      {              
        self->next->prev = self->prev;              
        self->prev->next = self->next;              
        self->next = self->prev = NULL;            
      }          

      pthread_mutex_unlock (&pthread_queue_busy->mutex);          

        /*add self to the idle queue */ 
      pthread_mutex_lock (&pthread_queue_idle->mutex);          

        /*now the idle queue is empty */ 
      if (pthread_queue_idle->head == NULL
                || pthread_queue_idle->head == NULL)
      {              
        pthread_queue_idle->head = pthread_queue_idle->rear = self;              
        self->next = self->prev = NULL;            
      }
          
      else
      {              
        self->next = pthread_queue_idle->head;              
        self->prev = NULL;              
        self->next->prev = self;
           
        pthread_queue_idle->head = self;              
        pthread_queue_idle->number++;       
      }          

      pthread_mutex_unlock (&pthread_queue_idle->mutex);          

      pthread_mutex_unlock (&self->mutex);          

        /*signal have idle thread */ 
      pthread_cond_signal (&pthread_queue_idle->cond);    
    }    
  }
}
View Code

这里,我们的工作线程,首先会查看,通过参数传递进来的,线程结构thread_node,是否未被分配任务,通过self-work==NULL.如果已经分配任务,就开始实行任务函数fun,执行完毕之后,在此函数中,修改任务的属性,然后free掉task_node结构体,同时也要进行thread_node属性的修改:

/*make self thread no work */
self->work = NULL;
self->flag = 0;

完成了一个任务之后,如果任务队列中还有任务的话,则直接运行此任务(说明,我们只需要通过thread_manager线程第一次分配任务和线程,一旦得到这个线程,并开始工作之后,我们就可以一直工作,直到工作线程中得不到任务,才等待线程池中的cond

如果工作线程完成一个任务之后,没有新的任务,则我们首先要把此线程加入空闲队列,删除工作队列,然后最后一步就是通知我们的线程管理线程,已经新的空闲线程了。

/*signal have idle thread */ 

pthread_cond_signal (&pthread_queue_idle->cond);

(这个线程池的实现,并没有动态的增加和删除线程,如果我们当前没有空闲线程,这个就会等待,知道有空闲线程被放进来)


初始化过程:

init_system ();

void  create_pthread_pool (void);

void
create_pthread_pool (void) 
{
  THREAD_NODE * temp =
    (THREAD_NODE *) malloc (sizeof (THREAD_NODE) * THREAD_DEF_NUM);
  
  if (temp == NULL)
  {
    printf (" malloc failure
");
    exit (EXIT_FAILURE);
  }

    /*init as a double link queue */ 
  int i;
  
  for (i = 0; i < THREAD_DEF_NUM; i++)
  {      
    temp[i].tid = i + 1;      
    temp[i].work = NULL;      
    temp[i].flag = 0;      

    if (i == THREAD_DEF_NUM - 1)        
      temp[i].next = NULL;
      
    if (i == 0)        
      temp[i].prev = NULL;
      
    temp[i].prev = &temp[i - 1];      
    temp[i].next = &temp[i + 1];      

    pthread_cond_init (&temp[i].cond, NULL);      
    pthread_mutex_init (&temp[i].mutex, NULL);      

        /*create this thread */ 
    pthread_create (&temp[i].tid, NULL, child_work, (void *) &temp[i]);    
  } 

    /*modify the idle thread queue attribute */ 
  pthread_mutex_lock (&pthread_queue_idle->mutex);  

  pthread_queue_idle->number = THREAD_DEF_NUM;  
  pthread_queue_idle->head = &temp[0];  
  pthread_queue_idle->rear = &temp[THREAD_DEF_NUM - 1];  

  pthread_mutex_unlock (&pthread_queue_idle->mutex);
} 

/*
*init_system :init the system glob pointor.
*/ 
void
init_system (void) 
{  
    /*init the pthread_queue_idle */ 
  pthread_queue_idle =
    (PTHREAD_QUEUE_T *) malloc (sizeof (PTHREAD_QUEUE_T));
  
  pthread_queue_idle->number = 0;  
  pthread_queue_idle->head = NULL;  
  pthread_queue_idle->rear = NULL;  
  pthread_mutex_init (&pthread_queue_idle->mutex, NULL);  
  pthread_cond_init (&pthread_queue_idle->cond, NULL);  

    /*init the pthread_queue_busy */ 
    pthread_queue_busy =
    (PTHREAD_QUEUE_T *) malloc (sizeof (PTHREAD_QUEUE_T));
  
  pthread_queue_busy->number = 0;  
  pthread_queue_busy->head = NULL;  
  pthread_queue_busy->rear = NULL;  
  pthread_mutex_init (&pthread_queue_busy->mutex, NULL);  
  pthread_cond_init (&pthread_queue_busy->cond, NULL);  

    /*init the task_queue_head */ 
    task_queue_head = (TASK_QUEUE_T *) malloc (sizeof (TASK_QUEUE_T));
  
  task_queue_head->head = NULL;  
  task_queue_head->number = 0;  
  pthread_cond_init (&task_queue_head->cond, NULL);  
  pthread_mutex_init (&task_queue_head->mutex, NULL);  

    /*create thread poll */ 
  create_pthread_pool ();
}
View Code

重点是创建我们的工作线程work_thread,

 for (i = 0; i < THREAD_DEF_NUM; i++)
  {      
    temp[i].tid = i + 1;      
    temp[i].work = NULL;      
    temp[i].flag = 0;      

    if (i == THREAD_DEF_NUM - 1)        
      temp[i].next = NULL;
      
    if (i == 0)        
      temp[i].prev = NULL;
      
    temp[i].prev = &temp[i - 1];      
    temp[i].next = &temp[i + 1];      

    pthread_cond_init (&temp[i].cond, NULL);      
    pthread_mutex_init (&temp[i].mutex, NULL);      

        /*create this thread */ 
    pthread_create (&temp[i].tid, NULL, child_work, (void *) &temp[i]);    
  } 

这里我们创建了我们的工作线程,还有也创建了与工作线程对于的thread_node结构。这样我们就可以通过这个结构去代表我们的线程,这个结构从这里创建之后,之后只会被修改,但是不会free,而我们的task_node则在child_work()函数中被free掉。

结构框架:

运行三个管理管控线程,thread_manager,task_mangager,和monitor.

初始化若干个工作线程,同时初始化线程池结构和任务池结构task_quque,thread_queue

原文地址:https://www.cnblogs.com/kkshaq/p/4455844.html