redis的多线程

初始化三类线程. 这三类线程被认为是后台执行.不影响主线程

  1. BIO_CLOSE_FILE . 关闭重写之前的aof文件.
  2. BIO_AOF_FSYNC . 定时刷新数据到磁盘上.
  3. BIO_LAZY_FREE . 惰性删除过期时间数据

redis为了保证其高效.一些比较耗时的动作会起线程或者进程来完成.不会阻塞在业务主线程上.】

使用多线程的特点

  • 创建3个线程.这个三个线程的功能互不影响
  • 每个线程都有一个工作队列.主线程生产任务放到任务队里.这三个线程消费这些任务.
  • 任务队列和取出消费的时候都得加锁.防止竞争
  • 使用条件变量来等待任务.以及通知
// 存放工作的队列
static list *bio_jobs[REDIS_BIO_NUM_OPS];

bio_jobs是一个双端链表结构

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;

    pthread_mutex_lock(&bio_mutex[type]);

    // 将新工作推入队列
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;

    pthread_cond_signal(&bio_condvar[type]);

    pthread_mutex_unlock(&bio_mutex[type]);

当有任务的时候.先把任务丢到redis工作队列里.这里记得加锁

void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    /* Make the thread killable at any time, so that bioKillThreads()
     * can work reliably. */
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        redisLog(REDIS_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
            continue;
        }

        /* Pop the job from the queue. 
         *
         * 取出(但不删除)队列中的首个任务
         */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;

        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);

        /* Process the job accordingly to its type. */
        // 执行任务
        if (type == REDIS_BIO_CLOSE_FILE) {
            close((long)job->arg1);

        } else if (type == REDIS_BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);

        } else {
            redisPanic("Wrong job type in bioProcessBackgroundJobs().");
        }

        zfree(job);

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        // 将执行完成的任务从队列中删除,并减少任务计数器
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;
    }
}
  • 操作前先上锁
  • 从工作任务里取任务
  • 解锁
  • 执行业务逻辑
  • 执行完上锁.重新pthread_cond_wait

条件变量

条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:

  • 一个线程等待"条件变量的条件成立"而挂起;
  • 另一个线程使"条件成立"(给出条件成立信号)。

==为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起==

pthread_cond_wait原理

就是说pthread_cond_wait(pthread_cond_t cond, pthread_mutex_t mutex)函数传入的参数mutex用于保护条件,因为我们在调用pthread_cond_wait时,如果条件不成立我们就进入阻塞,但是进入阻塞这个期间,如果条件变量改变了的话,那我们就漏掉了这个条件。因为这个线程还没有放到等待队列上,所以调用pthread_cond_wait前要先锁互斥量,即调用pthread_mutex_lock()。

==pthread_cond_wait在把线程放进阻塞队列后,自动对mutex进行解锁,使得其它线程可以获得加锁的权利。这样其它线程才能对临界资源进行访问并在适当的时候唤醒这个阻塞的进程。当pthread_cond_wait返回的时候又自动给mutex加锁==

原文地址:https://www.cnblogs.com/u013533289/p/11687937.html