深入学习keepalived之预备工作--线程

1. 线程的定义

1.1 线程定义在scheduler.h文件中,其定义如下所示

/* Thread itself. */
typedef struct _thread {
    unsigned long id;          /*identify*/
    unsigned char type;        /* thread type */
    struct _thread *next;        /* next pointer of the thread */
    struct _thread *prev;        /* previous pointer of the thread */
    struct _thread_master *master;    /* pointer to the struct thread_master. */
    int (*func) (struct _thread *);    /* event function */
    void *arg;            /* event argument */
    timeval_t sands;        /* rest of time sands value. */
    union {
        int val;        /* second argument of the event. */
        int fd;            /* file descriptor in case of read/write. */
        struct {
            pid_t pid;    /* process id a child thread is wanting. */
            int status;    /* return status of the process */
        } c;
    } u;
} thread_t;

1.2. 线程链表定义

/* Linked list of thread. */
typedef struct _thread_list {
    thread_t *head;
    thread_t *tail;
    int count;
} thread_list_t;

线程类型的定义如下:

/* Thread types. */
#define THREAD_READ        0             //读线程
#define THREAD_WRITE        1            //写线程
#define THREAD_TIMER        2            //计时器线程
#define THREAD_EVENT        3            //事件线程
#define THREAD_CHILD        4            //子线程
#define THREAD_READY        5            //就绪线程
#define THREAD_UNUSED        6           //未使用线程
#define THREAD_WRITE_TIMEOUT    7        //写超时线程
#define THREAD_READ_TIMEOUT    8         //读超时线程
#define THREAD_CHILD_TIMEOUT    9        //子超时线程
#define THREAD_TERMINATE    10            //停止线程
#define THREAD_READY_FD        11        

1.3.主线程定义

/* Master of the theads. */
typedef struct _thread_master {
    thread_list_t read;
    thread_list_t write;
    thread_list_t timer;
    thread_list_t child;
    thread_list_t event;
    thread_list_t ready;
    thread_list_t unuse;
    fd_set readfd;
    fd_set writefd;
    fd_set exceptfd;
    unsigned long alloc;
} thread_master_t;

2. 线程操作

2.1 生成主线程

/* global vars */
thread_master_t *master = NULL;

/* Make thread master. */
thread_master_t *
thread_make_master(void)
{
    thread_master_t *new;

    new = (thread_master_t *) MALLOC(sizeof (thread_master_t));
    return new;
}

2.2 销毁一个主线程

    

/* Stop thread scheduler. */
void
thread_destroy_master(thread_master_t * m)
{
    thread_cleanup_master(m);
    FREE(m);
}

//调用子函数,清空主线程的内容
/* Cleanup master */
static void
thread_cleanup_master(thread_master_t * m)
{
    /* Unuse current thread lists */
    thread_destroy_list(m, m->read);
    thread_destroy_list(m, m->write);
    thread_destroy_list(m, m->timer);
    thread_destroy_list(m, m->event);
    thread_destroy_list(m, m->ready);

    /* Clear all FDs */
    FD_ZERO(&m->readfd);
    FD_ZERO(&m->writefd);
    FD_ZERO(&m->exceptfd);

    /* Clean garbage */
    thread_clean_unuse(m);
}
//回收主线程内存
FREE(m);

2.3 增加一个简单的事件线程

/* Add simple event thread. */
thread_t *
thread_add_terminate_event(thread_master_t * m)
{
    thread_t *thread;

    assert(m != NULL);

    thread = thread_new(m);
    thread->type = THREAD_TERMINATE;
    thread->id = 0;
    thread->master = m;
    thread->func = NULL;
    thread->arg = NULL;
    thread->u.val = 0;
    thread_list_add(&m->event, thread);

    return thread;
}

2.4 创建不同类型的线程,并加入主线程中的对应线程链表,如读线程为例介绍

/* Add new read thread. */
thread_t *
thread_add_read(thread_master_t * m, int (*func) (thread_t *)
        , void *arg, int fd, long timer)
{
    thread_t *thread;

    assert(m != NULL);

    if (FD_ISSET(fd, &m->readfd)) {
        log_message(LOG_WARNING, "There is already read fd [%d]", fd);
        return NULL;
    }

    thread = thread_new(m);
    thread->type = THREAD_READ;
    thread->id = 0;
    thread->master = m;
    thread->func = func;
    thread->arg = arg;
    FD_SET(fd, &m->readfd);
    thread->u.fd = fd;

    /* Compute read timeout value */
    set_time_now();
    thread->sands = timer_add_long(time_now, timer);

    /* Sort the thread. */
    thread_list_add_timeval(&m->read, thread);

    return thread;
}

2.4.1 创建一个新的线程

/* Make new thread. */
thread_t *
thread_new(thread_master_t * m)
{
    thread_t *new;

    /* If one thread is already allocated return it */
    if (m->unuse.head) {
        new = thread_trim_head(&m->unuse);
        memset(new, 0, sizeof (thread_t));
        return new;
    }

    new = (thread_t *) MALLOC(sizeof (thread_t));
    m->alloc++;
    return new;
}

2.4.2 设置为读线程

    thread->type = THREAD_READ;
    thread->id = 0;
    thread->master = m;
    thread->func = func;
    thread->arg = arg;
    FD_SET(fd, &m->readfd);
    thread->u.fd = fd;

2.4.3 根据超时时间将读进程加入读进程列表中

/* Add a thread in the list sorted by timeval */
void
thread_list_add_timeval(thread_list_t * list, thread_t * thread)
{
    thread_t *tt;

    for (tt = list->head; tt; tt = tt->next) {
        if (timer_cmp(thread->sands, tt->sands) <= 0)
            break;
    }

    if (tt)
        thread_list_add_before(list, tt, thread);
    else
        thread_list_add(list, thread);
}

2.5 取消线程,从对应类型的线程列表中去除该线程,将它设置为unused类型,并加入unused线程链表。

/* Cancel thread from scheduler. */
void
thread_cancel(thread_t * thread)
{
    switch (thread->type) {
    case THREAD_READ:
        assert(FD_ISSET(thread->u.fd, &thread->master->readfd));
        FD_CLR(thread->u.fd, &thread->master->readfd);
        thread_list_delete(&thread->master->read, thread);
        break;
    case THREAD_WRITE:
        assert(FD_ISSET(thread->u.fd, &thread->master->writefd));
        FD_CLR(thread->u.fd, &thread->master->writefd);
        thread_list_delete(&thread->master->write, thread);
        break;
    case THREAD_TIMER:
        thread_list_delete(&thread->master->timer, thread);
        break;
    case THREAD_CHILD:
        /* Does this need to kill the child, or is that the
         * caller's job?
         * This function is currently unused, so leave it for now.
         */
        thread_list_delete(&thread->master->child, thread);
        break;
    case THREAD_EVENT:
        thread_list_delete(&thread->master->event, thread);
        break;
    case THREAD_READY:
    case THREAD_READY_FD:
        thread_list_delete(&thread->master->ready, thread);
        break;
    default:
        break;
    }

    thread->type = THREAD_UNUSED;
    thread_add_unuse(thread->master, thread);
}

2.6 获取下一个就绪进程

/* Fetch next ready thread. */
thread_t *
thread_fetch(thread_master_t * m, thread_t * fetch)
{
    int ret, old_errno;
    thread_t *thread;
    fd_set readfd;
    fd_set writefd;
    fd_set exceptfd;
    timeval_t timer_wait;
    int signal_fd;
#ifdef _WITH_SNMP_
    timeval_t snmp_timer_wait;
    int snmpblock = 0;
    int fdsetsize;
#endif

    assert(m != NULL);

    /* Timer initialization */
    memset(&timer_wait, 0, sizeof (timeval_t));

retry:    /* When thread can't fetch try to find next thread again. */

    /* If there is event process it first. */
    while ((thread = thread_trim_head(&m->event))) {
        *fetch = *thread;

        /* If daemon hanging event is received return NULL pointer */
        if (thread->type == THREAD_TERMINATE) {
            thread->type = THREAD_UNUSED;
            thread_add_unuse(m, thread);
            return NULL;
        }
        thread->type = THREAD_UNUSED;
        thread_add_unuse(m, thread);
        return fetch;
    }

    /* If there is ready threads process them */
    while ((thread = thread_trim_head(&m->ready))) {
        *fetch = *thread;
        thread->type = THREAD_UNUSED;
        thread_add_unuse(m, thread);
        return fetch;
    }

    /*
     * Re-read the current time to get the maximum accuracy.
     * Calculate select wait timer. Take care of timeouted fd.
     */
    set_time_now();
    thread_compute_timer(m, &timer_wait);

    /* Call select function. */
    readfd = m->readfd;
    writefd = m->writefd;
    exceptfd = m->exceptfd;

    signal_fd = signal_rfd();
    FD_SET(signal_fd, &readfd);

#ifdef _WITH_SNMP_
    /* When SNMP is enabled, we may have to select() on additional
     * FD. snmp_select_info() will add them to `readfd'. The trick
     * with this function is its last argument. We need to set it
     * to 0 and we need to use the provided new timer only if it
     * is still set to 0. */
    fdsetsize = FD_SETSIZE;
    snmpblock = 0;
    memcpy(&snmp_timer_wait, &timer_wait, sizeof(timeval_t));
    snmp_select_info(&fdsetsize, &readfd, &snmp_timer_wait, &snmpblock);
    if (snmpblock == 0)
        memcpy(&timer_wait, &snmp_timer_wait, sizeof(timeval_t));
#endif

    ret = select(FD_SETSIZE, &readfd, &writefd, &exceptfd, &timer_wait);

    /* we have to save errno here because the next syscalls will set it */
    old_errno = errno;

       /* Handle SNMP stuff */
#ifdef _WITH_SNMP_
    if (ret > 0)
        snmp_read(&readfd);
    else if (ret == 0)
        snmp_timeout();
#endif

    /* handle signals synchronously, including child reaping */
    if (FD_ISSET(signal_fd, &readfd))
        signal_run_callback();

    /* Update current time */
    set_time_now();

    if (ret < 0) {
        if (old_errno == EINTR)
            goto retry;
        /* Real error. */
        DBG("select error: %s", strerror(old_errno));
        assert(0);
    }

    /* Timeout children */
    thread = m->child.head;
    while (thread) {
        thread_t *t;

        t = thread;
        thread = t->next;

        if (timer_cmp(time_now, t->sands) >= 0) {
            thread_list_delete(&m->child, t);
            thread_list_add(&m->ready, t);
            t->type = THREAD_CHILD_TIMEOUT;
        }
    }

    /* Read thead. */
    thread = m->read.head;
    while (thread) {
        thread_t *t;

        t = thread;
        thread = t->next;

        if (FD_ISSET(t->u.fd, &readfd)) {
            assert(FD_ISSET(t->u.fd, &m->readfd));
            FD_CLR(t->u.fd, &m->readfd);
            thread_list_delete(&m->read, t);
            thread_list_add(&m->ready, t);
            t->type = THREAD_READY_FD;
        } else {
            if (timer_cmp(time_now, t->sands) >= 0) {
                FD_CLR(t->u.fd, &m->readfd);
                thread_list_delete(&m->read, t);
                thread_list_add(&m->ready, t);
                t->type = THREAD_READ_TIMEOUT;
            }
        }
    }

    /* Write thead. */
    thread = m->write.head;
    while (thread) {
        thread_t *t;

        t = thread;
        thread = t->next;

        if (FD_ISSET(t->u.fd, &writefd)) {
            assert(FD_ISSET(t->u.fd, &writefd));
            FD_CLR(t->u.fd, &m->writefd);
            thread_list_delete(&m->write, t);
            thread_list_add(&m->ready, t);
            t->type = THREAD_READY_FD;
        } else {
            if (timer_cmp(time_now, t->sands) >= 0) {
                FD_CLR(t->u.fd, &m->writefd);
                thread_list_delete(&m->write, t);
                thread_list_add(&m->ready, t);
                t->type = THREAD_WRITE_TIMEOUT;
            }
        }
    }
    /* Exception thead. */
    /*... */

    /* Timer update. */
    thread = m->timer.head;
    while (thread) {
        thread_t *t;

        t = thread;
        thread = t->next;

        if (timer_cmp(time_now, t->sands) >= 0) {
            thread_list_delete(&m->timer, t);
            thread_list_add(&m->ready, t);
            t->type = THREAD_READY;
        }
    }

    /* Return one event. */
    thread = thread_trim_head(&m->ready);

#ifdef _WITH_SNMP_
    run_alarms();
    netsnmp_check_outstanding_agent_requests();
#endif

    /* There is no ready thread. */
    if (!thread)
        goto retry;

    *fetch = *thread;
    thread->type = THREAD_UNUSED;
    thread_add_unuse(m, thread);

    return fetch;
}

2.7 子线程处理,便利子线程链表取出子线程,并放入就绪线程链表。

/* Synchronous signal handler to reap child processes */
void
thread_child_handler(void * v, int sig)
{
    thread_master_t * m = v;

    /*
     * This is O(n^2), but there will only be a few entries on
     * this list.
     */
    thread_t *thread;
    pid_t pid;
    int status = 77;
    while ((pid = waitpid(-1, &status, WNOHANG))) {
        if (pid == -1) {
            if (errno == ECHILD)
                return;
            DBG("waitpid error: %s", strerror(errno));
            assert(0);
        } else {
            thread = m->child.head;
            while (thread) {
                thread_t *t;
                t = thread;
                thread = t->next;
                if (pid == t->u.c.pid) {
                    thread_list_delete(&m->child, t);
                    thread_list_add(&m->ready, t);
                    t->u.c.status = status;
                    t->type = THREAD_READY;
                    break;
                }
            }
        }
    }
}

2.8 线程调用

/* Call thread ! */
void
thread_call(thread_t * thread)
{
    thread->id = thread_get_id();
    (*thread->func) (thread);
}

2.9 启动调度器

/* Our infinite scheduling loop */
void
launch_scheduler(void)
{
    thread_t thread;

    signal_set(SIGCHLD, thread_child_handler, master);

    /*
     * Processing the master thread queues,
     * return and execute one ready thread.
     */
    while (thread_fetch(master, &thread)) {
        /* Run until error, used for debuging only */
#ifdef _DEBUG_
        if ((debug & 520) == 520) {
            debug &= ~520;
            thread_add_terminate_event(master);
        }
#endif
        thread_call(&thread);
    }
}
原文地址:https://www.cnblogs.com/davidwang456/p/3543937.html