基于CAS实现无锁结构

杨乾成

一、题目要求

  基于CAS(Compare and Swap)实现一个无锁结构,可考虑queue,stack,hashmap,freelist等。

  • 能够支持多个线程同时访问该结构
  • 不能有任何锁操作,且操作是线程安全的
  • 对上述的内存单元进行管理,至少malloc与free一次。

二、数据结构

  看到题目有一说一,不知道怎么下手,那就google一下先。稍微了解了一下CAS,原准备使用STL模板的队列,后来发现实现题目要求似乎得再Queue的插入和删除函数里面具体实现,于是决定自己实现吧(省事失败)。

  先简要介绍一下CAS算法,我先po一下wiki百科上的代码介绍:

bool compare_and_swap (int *accum, int *dest, int newval)
{
  if ( *accum == *dest ) {
      *dest = newval;
      return true;
  }
  return false;
}

  这段代码的思想大概就是看看*dest里面的值和*accum里面的值是不是一样,如果一样,则将新数据写入。个人觉得用数据库的“脏数据”概念来形容很合适:即不允许往存有脏数据的缓冲区写入数值。在数据库中,常用的方法是两段锁,但是这里不让加锁。突然想起来大二上学期修JAVA里面讲到的同步信号量,即每当对数据结构进行操作时,就对=改变信号量的值,比如只有信号量值为0的时候才允许更改变量,修改时使用者将信号量置一,但是仔细一想,这似乎也是锁的思想。

  遇事不决,再看看,害,现成的CAS都在这里,我直接用不就完事了。回归正题,CAS思想总结如下:线程操作钱保存当前版本号,也就是记录当前数据的状态的标志;当完成操作时,比较自己持有的版本号与当前版本号是否相符,相符则允许写入,否则获取的新的版本号再来一轮。就好像是大家都听说身边有个好看的姑娘处于“单身版本”,各自回去准备准备,第一个到的获得了优先权,后来的只能记住另一个“单身”的姑娘,再回家准备展开竞争,直到自己完成任务。

三、代码实现

#include<stdio.h>
#include<pthread.h>
#include <unistd.h>
#include <assert.h>
#include<stdlib.h>
#define THREAD_NUMBER 4
//队节点
typedef struct QNode
{
    int data;
    struct QNode *next;
}QNode, *QueuePtr;
//链队结构
typedef struct LinkQueue
{
    QueuePtr front;
    QueuePtr rear;
}LinkQueue;

int is_Empty(LinkQueue *q);
void *pushThread(void *arg);
void *popThread(void *arg);
void init_Queue(LinkQueue *q);void cas_push(LinkQueue *q, int e);
void cas_push(LinkQueue *q, int e);
int cas_pop(LinkQueue *q, int *e);
void show(LinkQueue *q);

int main()
{
    LinkQueue que;
    init_Queue(&que);
    int i;

    //创建四个进程
    pthread_t threadArr[THREAD_NUMBER];    //线程ID数组
    for (i = 0; i < THREAD_NUMBER; ++i)
    {
        pthread_create(&threadArr[i], NULL, pushThread, (void *)&que);    //设置线程入口
    }
    //等待线程执行结束
    for (i = 0; i < THREAD_NUMBER; ++i)
    {
        pthread_join(threadArr[i], NULL);
    }
    show(&que);

    //创建四个进线程然后执行出队操作
    for (i = 0; i < THREAD_NUMBER; ++i)
    {
        pthread_create(&threadArr[i], NULL, popThread, (void *)&que);
    }
    for (i = 0; i < THREAD_NUMBER; ++i)
    {
        pthread_join(threadArr[i], NULL);
    }
    exit(EXIT_SUCCESS);
 }

void *pushThread(void *arg)
{
    printf("进程%ld开始创建队列:
",pthread_self());
    LinkQueue *quePtr =(LinkQueue *)arg;
    int i;
    for(i= 0;i<5;++i)
    {       
        cas_push(quePtr, i);    
    }
    printf("
");
    printf("完成队列创建操作!
");
    pthread_exit(NULL);     //结束当前进程
}

void *popThread(void *arg)
{
    printf("%ld开始出队操作:
",pthread_self());
    LinkQueue *quePtr=(LinkQueue *)arg;
    int temp;
    int res;
    while(1)
    {
        res = cas_pop(quePtr,&temp);
        if(!res)
        {
            break;    
        }
        printf("PID:%ld	pop%d
",pthread_self(),temp);
    }
    
}

//初始化队列
void init_Queue(LinkQueue *q)
{
    q->front=q->rear=(QNode *)malloc(sizeof(QNode));
    q->front->next=NULL;
}

//判断队列是否为空
int is_Empty(LinkQueue *q)
{
    if(q->front->next ==NULL)
    {
        return 1;
    }
    return 0;
}


void cas_push(LinkQueue *q, int e) 
{
     QueuePtr newNode = (QueuePtr)malloc(sizeof(QNode));
     newNode->data = e;
     newNode->next = NULL;
     QueuePtr tmp; 
     //不断尝试将新节点进队
     do
     {
         // sleep(1);
          tmp = q->rear;
     }while (!__sync_bool_compare_and_swap((long *)(&(tmp->next)), NULL, (long)newNode));    //判断此时队尾是否被改变
        //就是在这里控制对队列的原子化操作
     sleep(1);
     q->rear = newNode;
} 
    
int cas_pop(LinkQueue *q, int *e)
{
     QueuePtr tmp;
     do
     {
          if (is_Empty(q))
          {
               return(0);     //队列已空,结束
          } 
          tmp = q->front->next; 
     } while (!__sync_bool_compare_and_swap((long *)(&(q->front->next)), (long)tmp, (long)tmp->next));
     sleep(1);     //通过休眠一秒,模拟线程操作时间,防止过快操作导致结果不明显
     *e = tmp->data;
     free(tmp); 
     return 1;
}

void show(LinkQueue *q) 
{
     printf("Current queue:
"); 
     QueuePtr tmp = q->front->next; 
     while (tmp) 
     {
          printf("%d ", tmp->data);
          tmp = tmp->next; 
     } 
     printf("
");
}

上面就是这次的代码实现。

四、运行结果

使用指令:

gcc CAS_Structure.c -o result.out -lpthread

将源码进行编译,此处要注意后面一定要加上-lpthread,不然会提示“pthread.h”头文件报错问题。

我创建了四个进程,竞争往队尾插入数据。

  然后竞争删除完队列的数据,直到队列为空。

  从截图可以看出来,四个进程竞争着抢占处理器,直到完成创建和删除节点的任务。这里面要注意的是,因为实际处理就是插入节点,刚开始做的时候我没有插入sleep语句,所以最后的结果看起来像是顺序完成,所以要模拟实验,一定要注意加上sleep语句模拟进程处理的时间,防止第二个进程还没创建完,第一个进程早已完成了所有事务。

原文地址:https://www.cnblogs.com/blogMorningStar/p/12041033.html