Posix信号量

  提供不同进程间或给定的一个特定进程的不同线程间的同步手段。

  1. Posix有名信号量,由IPC名字标识,通常指代文件系统中的某个文件,但不要求真正存放在文件系统的某个文件中(如果信号量的实现用到了映射文件);随内核持续,可用于进程或线程间的同步
  2. Posix基于内存的信号量,放在共享内存中,同步多线程时,可以放到该多线程所属进程空间里;如果是同步多进程,那就需要把信号灯放入到共享内存中(方便多个进程访问)。随进程持续

  有名信号灯和基于内存的信号灯,具体区别体现在创建和销毁两个函数。有名信号灯使用sem_open和sem_close函数。基于内存的信号灯使用sem_init和sem_destroy函数。sem_init的参数可以控制是同步多线程,还是多进程;且该函数只能调用1次,因为调用后信号灯就存在了( 内存指针存在)。一般,使用基于内存的信号灯同步同进程多线程,使用有名信号灯同步多进程

  1. 创建(create)一个信号量。要求调用者指定初始值。通常初始值为0或1的称作二值信号量,初始值为N(N>=0)的称作计数信号量,N指示可用的资源数(比如说缓冲区个数)。
  2. 等待(wait)一个信号量。该操作测试信号量的值。如果它的值大于0,则将值减1;如果它的值小于或等于0,则阻塞等待其值变为大于0,然后将值减1。
  3. 挂起(post)一个信号量。该操作将信号量的值加1。

生产者与消费者模型:

  只有一个生产者和消费者线程,而且它们只共享一个缓冲区。它们之间的行为被彼此约束: 生产者只有在缓冲区中的数据被消费者处理后(缓冲区为空)才能放入新的数据。消费者只有在缓冲区已被生产者放入数据后(缓冲区不为空)才能处理数据。对生产者而言,缓冲区为空表示有资源可用。对消费者而言,缓冲区不为空表示有资源可用。

  需要使用两个信号量get和put,因为初始缓冲区为空,所以我们把put的初始值置为1表示生产者的可用资源数为1,把get的初始值置为0表示消费者的初始可用资源数为0,消费者先运行

互斥锁、条件变量区信号量别

  1. 互斥锁必须总是由锁住他的线程解锁,信号量的挂出不必由执行过它的等待操作的同一线程执行
  2. 互斥锁要么被锁住要没被解开
  3. 既然信号量有一个与之关联的状态(它的计数值)那么信号量的挂出总是被记住,然而向一个条件变量上发送信号时,如果没有线层等待该条件变量那么信号将丢失

  互斥锁和信号量是为线程间的同步机制说明。信号量为进程间的同步机制说明,这些进程可能共享也可能不共享某个内存区域。但信号量也可用同步线程,互斥锁条件变量也可用于同步进程。

有名信量

  一个信号量的状态可能包含在它本身的sem_t数据类型中,还包含其他的信息(如:文件描述符),知道该标识符的任何进程都可以访问该信号量,它的整数标识符只是告诉内核具体引用哪个信号量

  即使对于某个特定的名字sem_open调用在每个进程中可能返回不同的指针,使用该指针的信号量函数(sem_post,sem_wait)引用的任然是同一个信号量

sem_open

  创建并初始化信号灯,返回值:成功时,返回信号灯的指针,错误返回SEM_FAILED

#include <semaphore.h>
sem_t * sem_open(const char * name,int oflag,mode_t mode,unsigned int value);
sem_t * sem_open(const char * name,int oflag);
  1. name是给信号灯指定一个名字
  2. oflag的值为O_CREAT,表示如果信号灯不存在,创建信号灯,若存在并不返回一个错误;为O_CREAT|O_EXCL,如果信号灯不存在报错。
  3. 后面两个参数,只有新建信号灯时使用。mode为信号灯的权限(0644),value为信号灯的值,该值不能超过SEM_VALUE_MAX(至少为32767),二值信号量的初值为1,计数信号量的初值往往大于一。

  默认都具备读写权限不需要指定O_RNONLY 和O_WRONLY,因为信号量的挂出与等待操作都要改变信号量的值,不具备读写权限的信号量会导致sem_open返回EACCES(访问权限不符合)

sem_close

  进程终止时(自愿exit,_exit或非自愿接收到一个信号)内核对其上仍打开的所有有名信号量自动执行信号量关闭操作。

#include <semaphore.h>
int sem_close(sem_t * sem)

  成功时返回0;失败,-1

  每个信号灯有一个引用计数器记录当前打开次数.关闭一个信号灯并没有将它从系统中删除因为他是随内核持续的,而是信号灯引用计数减1,即时当前没有进程打开信号量,它的值仍然保持。

sem_unlink

  信号灯引用计数为大于0时,name从系统中删除,而其信号量的析构要等到最后一个sem_close发生。

  返回值:成功时,返回0,失败,-1,需要显示调用

#include <semaphore.h>
int sem_close(const char *name)

sem_wait/sem_trywait

  sem_wait测试所指信号量的值,若大于0将它减一立刻返回,如果等于0,调用的线程会被投入睡眠,直到该值大于0这时在将它减一,随后返回。(测试减一是原子操作)

  sem_trywait与sem_wait的区别是:当所指信号量的值为0时,不将调用线程投入睡眠。返回EAGAIN错误。

  如果某个信号量被中断。sem_wait可能尽早的返回,返回的错误为EINTR。

#include <semaphore.h>
int sem_wait(sem_t *sem)
int sem_trywait(sem_t *sem)

sem_getvalue

  获得信号灯当前值。参数:sem为信号灯指针,valp为信号灯的值

  返回值:返回0或某个负数。如果当前信号量以上锁,其绝对值等于等待该信号量解锁的线程数。

#include <semaphore.h>
int sem_getvalue(sem_t *sem,int *valp)

sem_post

  返回值:成功时,返回0,失败,-1

  任何线程都可以挂出一个信号(比如值由0增加为1),即使当前没有线程在该信号上阻塞;然而如果某个线程调用pthread_cond_signal,当时没有任何线程阻塞在pthread_cond_wait上时,那么发送相应的条件变量的信号将丢失。

  在各种同步技巧(互斥锁、条件变量、读写锁、信号量)中,能够在信号处理程序中安全调用的唯一函数时sem_post。

#include <semaphore.h>
int sem_post(sem_t *sem)

总结:

  1. 当某个进程持有信号量的锁时进程没有释放他就终止,内核并不给信号量解锁,这与锁不同,当进程持有某个记录的锁没有释放就终止,内核自动释放。
  2. 能够从信号处理程序中安全调用的唯一函数时sem_post,互斥锁是为上锁而优化,条件变量为等待而优化,信号量即可用于上锁,也可用于等待,01信号量就相当于互斥锁
  3. 如果一个以上的线程在信号灯上等待,sem_post将唤醒其中一个(优先级最高的或最早等待的线程),如果没有线程在等待,信号灯计数器被加一。
  4. 假如有个代码段,期望最多有两个线程同时在里面执行(其他线程必须等待),可以使用一个信号灯而不需要附加任何状态,初始化信号灯值为2,然后再代码段开始调用sem_wait,结尾处调用sem_post,然后两个线程可以再信号登上等待而不被阻塞。 
  5. sem_open,在父进程中打开的信号量在子进程中仍然打开

  生产者与消费者问题:

#include "unpipc.h"

#define BUFF 10
#define SEM_MUTEX "/tmpmutex"
#define SEM_NEMPTY "/tmpnempty"
#define SEM_NSTORED "/tmpnstored"

struct shared
{
    int buffer[BUFF];
    sem_t *mutex,*nempty,*nstored;
}shared;

void *produce(void *arg)
{
    int i;
    for(i=0;i<BUFF;++i)
    {
        sem_wait(shared.nempty);
        sem_wait(shared.mutex);

        shared.buffer[i%BUFF]=i;

        sem_post(shared.mutex);
        sem_post(shared.nstored);
    }
    return NULL;
}

void *consume(void *arg)
{
    int i;
    for(i=0;i<BUFF;++i)
    {
        sem_wait(shared.nstored);//这两个wait交换会造成死锁
        sem_wait(shared.mutex);

        printf("shared.buffer[%d]= %d
",i,shared.buffer[i%BUFF]);

        sem_post(shared.mutex);
        sem_post(shared.nempty);
    }
    return NULL;
}

int main()
{
    pthread_setconcurrency(2);
    pthread_t pid_consume,pid_produce;

    /*每个信号量都需要正确的初始化,如果先前建立的信号里因本程序终止没有
     * 删除,那么,可以在sem_open之前调用sem_unlink,并忽略任何错误,也
     * 可以指定O_EXCL,检查是否返回EEXIST错误,若是,调用sem_unlink,并
     * 且再次调用sem_open。如果想验证本程序只有一个副本在运行,可以调用
     * fcntl文件锁
     */
    shared.mutex=sem_open(SEM_MUTEX,O_CREAT|O_EXCL,FILE_MODE,1);
    shared.nempty=sem_open(SEM_NEMPTY,O_CREAT|O_EXCL,FILE_MODE,BUFF);
    shared.nstored=sem_open(SEM_NSTORED,O_CREAT|O_EXCL,FILE_MODE,BUFF);

    pthread_create(&pid_produce,NULL,produce,NULL);
    pthread_create(&pid_consume,NULL,consume,NULL);

    pthread_join(pid_produce,NULL);
    pthread_join(pid_consume,NULL);

    sem_unlink(SEM_MUTEX);
    sem_unlink(SEM_NEMPTY);
    sem_unlink(SEM_NSTORED);

    exit(0);
}

基于内存的信号量

#include <semaphore.h>

int sem_init(sem_t * sem,int shared,unsigned int value);
int sem_destroy(sem_t * sem)

  sem为指向应用程序必须分配的sem_t变量,shared是指同步多线程还是多进程(0:同一进程间的线程共享;其他:进程间共享,该信号量必须放在在某个类型的共享内存中,而即将使用他的所有进程都要能访问共享内存),value为信号量值

  返回值:成功时,不返回0,失败时,返回-1

注意:

  1. 必须保证该函数只调用一次,对一个已经初始化的信号量调用sem_init结果是未定义的。
  2. sem_open与sem_init:前者返回一个指向某个sem_t变量的指针,该变量由sem_open函数本身分配并初始化;后者的第一个参数时指向某个sem_t变量的指针,该变量由调用者分配,然后由sem_int初始化
  3. 一个基于内存的信号量,只有sem_init的参数指向的位置可用于访问该信号量,使用他的sem_t数据类型副本访问时为定义的结果
  4. 它的持续性取决于信号量存放的内存区域类型,只要某个基于内存的信号量的内存区保持有效,该信号量就一直存在,随进程持续,进程终止其消失
  5. fork的子进程通常不共享父进程内存空间,所以不能用sem_init(因为只有sem_init指向的位置才可以访问该信号量)

多个生产者单个消费者

#include "unpipc.h"
#include <pthread.h>
#include <string.h>

#define BUFF 10

struct shared
{
    struct buff
    {
        char data[BUFFSIZE];
        size_t n;
    }buffer[BUFF];
    sem_t mutex,nempty,nstored;
}shared;

int fd;

void *produce(void *arg)
{
    int i=0;
    while(1)
    {
        //printf("fd is:%d
",fd);
        sem_wait(&shared.nempty);

        sem_wait(&shared.mutex);
        //如果数据缓冲区要是在一个连表上维护,此处是链表中移除某个缓冲区的
        //地方,把该操作放在临界区是避免生产者消费者对链表操作发生冲突
        sem_post(&shared.mutex);

        shared.buffer[i].n=read(fd,shared.buffer[i].data,BUFFSIZE);
        if(shared.buffer[i].n==0)//读取结束
        {
            sem_post(&shared.nstored);
            return NULL;
        }

        if(++i>=BUFF)
            i=0;//循环缓冲

        sem_post(&shared.nstored);
    }
    return NULL;
}

void *consume(void *arg)
{
    int i=0;
    while(1)
    {
        sem_wait(&shared.nstored);

        sem_wait(&shared.mutex);
        //同上
        sem_post(&shared.mutex);

        if(shared.buffer[i].n==0)
            return NULL;

        write(1,shared.buffer[i].data,shared.buffer[i].n);
        if(++i>BUFF)
            i=0;

        sem_post(&shared.nempty);
    }
    return NULL;
}

int main()
{
    printf("请输入文件名
");
    char pathname[100];
    fgets(pathname,100,stdin);
    int len=strlen(pathname);
    if(pathname[len-1]=='
')
        pathname[len-1]='';
    fd=open(pathname,O_RDONLY);
    //printf("pathname:%s",pathname);
    //printf("fd is main: %d
",fd);

    sem_init(&shared.mutex,0,1);
    sem_init(&shared.nempty,0,BUFF);
    sem_init(&shared.nstored,0,0);

    pthread_setconcurrency(2);

    pthread_t pid_produce,pid_consume;

    pthread_create(&pid_consume,NULL,consume,NULL);
    pthread_create(&pid_produce,NULL,produce,NULL);
    //pthread_create(&pid_consume,NULL,consume,NULL);

    pthread_join(pid_produce,NULL);
    pthread_join(pid_consume,NULL);

    sem_destroy(&shared.mutex);
    sem_destroy(&shared.nempty);
    sem_destroy(&shared.nstored);

    exit(0);
}

代码github:https://github.com/tianzengBlog/test/tree/master/ipc2/sem

多个生产者多个消费者

  在有多个消费者的情形下,消费者验证数据使用的数组下标和数据值都变成共享的,需要使用互斥锁保护。为了方便,我们还是只使用一个互斥锁mutex。为消费者我们新增了3个变量nget、ngetval和nconsumers。

  要注意的还是生产者线程终止时的行为,我们新增了一个挂起nstored信号量的操作。因为所有的数据都验证完时nstored值为0,所有的消费者线程都在阻塞等待nstored信号量,如果不挂起nstored信号量,那么所有的消费者线程都会永远阻塞。

#include "unpipc.h"
 
#define NBUFF             10
#define MAXNTHREADS     100
 
int nitems, nproducers, nconsumers; 
 
struct {
    int buff[NBUFF];
    int nput;
    int nputval;
    int nget;
    int ngetval;
    sem_t mutex, nempty, nstored;
} shared;
 
void *produce(void *);
void *consume(void *);
 
int main(int argc, char **argv)
{
    int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];
    pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];
 
    if (argc != 4)
        err_quit("usgae: prodcons4 <#items> <#producers> <#consumers>");
 
    nitems = atoi(argv[1]);
    nproducers = min(atoi(argv[2]), MAXNTHREADS);
    nconsumers = min(atoi(argv[3]), MAXNTHREADS);
 
    Sem_init(&shared.mutex, 0, 1);
    Sem_init(&shared.nempty, 0, NBUFF);
    Sem_init(&shared.nstored, 0, 0);
 
    Pthread_setconcurrency(nproducers + nconsumers);
    for (i = 0; i < nproducers; i++) {
        prodcount[i] = 0;
        Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
    }
    for (i = 0; i < nconsumers; i++) {
        conscount[i] = 0;
        Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);
    }
 
    for (i = 0; i < nproducers; i++) {
        Pthread_join(tid_produce[i], NULL);
        printf("producer count[%d] = %d
", i, prodcount[i]);
    }
    for (i = 0; i < nconsumers; i++) {
        Pthread_join(tid_consume[i], NULL);
        printf("consumer count[%d] = %d
", i, conscount[i]);
    }
    
    Sem_destroy(&shared.mutex);
    Sem_destroy(&shared.nempty);
    Sem_destroy(&shared.nstored);
 
    exit(0);
}
 
void* produce(void *arg)
{
    int i;
    
    for ( ; ; ) {
        Sem_wait(&shared.nempty); //A
        Sem_wait(&shared.mutex);
        if (shared.nput >= nitems) {
            Sem_post(&shared.nstored); //重要!
            Sem_post(&shared.nempty); //A
            Sem_post(&shared.mutex);
            return NULL;
        }
        
        shared.buff[shared.nput % NBUFF] = shared.nputval;
        shared.nput++;
        shared.nputval++;
        Sem_post(&shared.mutex);
        Sem_post(&shared.nstored); //B
        *((int *)arg) += 1;
    }
    
}
 
void* consume(void *arg)
{
    int i;
 
    for ( ; ; ) {
        Sem_wait(&shared.nstored); //B
        Sem_wait(&shared.mutex);
        if (shared.nget >= nitems) {
            Sem_post(&shared.nstored); //B
            Sem_post(&shared.mutex);
            return NULL;
        }
        
        i = shared.nget % NBUFF;
        if (shared.buff[i] != shared.ngetval)
            printf("error: buff[%d] = %d
", i, shared.buff[i]);
        shared.nget++;
        shared.ngetval++;
        Sem_post(&shared.mutex);
        Sem_post(&shared.nempty); //A
        *((int *)arg) += 1;
    }
    
}

信号量的限制

  1. 在<unistd,h>中定义,也可运行时通过sysconf获取
  2. SEM_NSEMS_MAX:一个进程可以最大打开的信号量数(posix要求至少为256)
  3. SEM_VALUE_MAX:一个信号量的最大值(posix至少要求为32767)

  可以用FIFO实现信号量也可以用内存映射I/O实现信号量。

原文地址:https://www.cnblogs.com/tianzeng/p/9314463.html