多线程计数器——原子操作

众所周知,多线程下计数存在着计数不对的问题。这个问题的根源在于多个线程对同一个变量能够同一时候訪问(改动)。这样就造成了改动后的结果不一致。

         首先在这里先强调一点。volatile keyword并不能提供多线程安全訪问。由于有volatie修饰的变量。每次操作时遵循以下动作: 

从内存取值 ---> 放入寄存器 ---> 操作 --->写回内存 

这几个步骤不是原子的操作在随意两个步骤之间都可能被其它的线程中断,所以不是线程安全。具体内容參见

http://blog.csdn.net/littlefang/article/details/6103038

LINUX

         多线程訪问同一个变量,大多数程序猿最先想到的就是利用锁制造处一片线程安全的区域在该区域内,做线程安全的操作。

         

<span style="white-space:pre">	</span>{
                   Locktem(&mutx);
                   ++cunt;
	}

要知道如上的代码运行花费的时间至少是仅仅做++cunt操作的10倍。

这样就造成了代码的效率减少。

通过以上的小样例看出加锁操作并非一种高效的线程计数机制。转而就要向操作系统寻求帮助了。

在linux操作系统中仅仅要包括<asm/atomic.h><asm/bitops.h>就能够实现内核级的原子操作,全然能满足我们的需求。

下面内容引自 http://www.linuxidc.com/Linux/2011-06/37402.htm

可是在Linux2.6.18之后,删除了<asm/atomic.h><asm/bitops.h>GCC提供了内置的原子操作函数。更适合用户态的程序使用。如今atomic.h在内核头文件里,不在gcc默认搜索路径下,即使像以下这样强行指定路径。还是会出现编译错误。

 

#include</usr/src/linux-headers-2.6.27-7/include/asm-x86/atomic.h> 

gcc4.1.2提供了__sync_*系列的built-in函数,用于提供加减和逻辑运算的原子操作。

 

能够对1,2,48字节长度的数值类型或指针进行原子操作。其声明例如以下

 

type __sync_fetch_and_add (type *ptr, type value, ...)  

type __sync_fetch_and_sub (type *ptr, type value, ...)  

type __sync_fetch_and_or (type *ptr, type value, ...)  

type __sync_fetch_and_and (type *ptr, type value, ...)  

type __sync_fetch_and_xor (type *ptr, type value, ...)  

type __sync_fetch_and_nand (type *ptr, type value,...)  

          { tmp =*ptr; *ptr op= value; return tmp; }  

          { tmp =*ptr; *ptr = ~tmp & value; return tmp; }  // nand  

 

type __sync_add_and_fetch (type *ptr, type value, ...)  

type __sync_sub_and_fetch (type *ptr, type value, ...)  

type __sync_or_and_fetch (type *ptr, type value, ...)  

type __sync_and_and_fetch (type *ptr, type value, ...)  

type __sync_xor_and_fetch (type *ptr, type value, ...)  

type __sync_nand_and_fetch (type *ptr, type value,...)  

          { *ptr op=value; return *ptr; }  

          { *ptr =~*ptr & value; return *ptr; }   //nand 

这两组函数的差别在于第一组返回更新前的值,第二组返回更新后的值,以下的演示样例引自这里http://www.linuxidc.com/Linux/2011-06/37403.htm

 


对于使用atomic.h的老代码。能够通过宏定义的方式。移植到高内核版本号的linux系统上,比如

 

#define atomic_inc(x) __sync_add_and_fetch((x),1)  

#define atomic_dec(x) __sync_sub_and_fetch((x),1)  

#define atomic_add(x,y) __sync_add_and_fetch((x),(y))  

#define atomic_sub(x,y) __sync_sub_and_fetch((x),(y)) 

 

以上内容引自 http://www.linuxidc.com/Linux/2011-06/37402.htm

 

#include <stdio.h>  
#include <pthread.h>  
#include <stdlib.h>  
 
static int count = 0;  
 
void *test_func(void *arg)  
{  
        int i=0;  
       for(i=0;i<20000;++i){  
               __sync_fetch_and_add(&count,1);  
        }  
        returnNULL;  
}  
 
int main(int argc, const char *argv[])  
{  
        pthread_tid[20];  
        int i =0;  
 
       for(i=0;i<20;++i){  
               pthread_create(&id[i],NULL,test_func,NULL);  
        }  
 
       for(i=0;i<20;++i){  
               pthread_join(id[i],NULL);  
        }  
 
       printf("%d
",count);  
        return0;  
} 

基于GCC编译器,我们能够自定义实现一个多线程安全的原子计数器。实现代码例如以下

my_stomic.hpp

#ifndefMY_ATOMIC_H
#defineMY_ATOMIC_H
 
class my_atmic
{
public:
    volatile int m_coun;
    explicit my_atmic(int cunn)
        :m_coun(cunn)
    {}
    ~my_atmic()
    {}
    my_atmic& operator++()
    {
       __sync_fetch_and_add(&m_coun,1);
        return *this;
    }
    my_atmic& operator--()
    {
        __sync_fetch_and_sub(&m_coun,1);
        return *this;
    }
    my_atmic operator ++(int)
    {
        my_atmic ret(this->m_coun);
        ++*this;
        return ret;
    }
    my_atmic operator--(int)
    {
        my_atmic ret(this->m_coun);
        --*this;
        return ret;
    }
    my_atmic& operator+=(constmy_atmic& m)
    {
       __sync_fetch_and_add(&m_coun,m.m_coun);
        return *this;
    }
    my_atmic& operator +=(int m)
    {
        __sync_fetch_and_add(&m_coun,m);
        return *this;
    }
    my_atmic& operator-=(constmy_atmic& m)
    {
       __sync_fetch_and_sub(&m_coun,m.m_coun);
        return *this;
    }
    my_atmic& operator -=(int m)
    {
        __sync_fetch_and_sub(&m_coun,m);
        return *this;
    }
    my_atmic& operator=(const my_atmic&m)
    {
        this->m_coun = m.m_coun;
        return *this;
    }
    my_atmic& operator=(int m)
    {
        this->m_coun = m;
        return *this;
    }
private:
    my_atmic(){}
    my_atmic(const my_atmic& );
   
};
#endif //MY_ATOMIC_H


測试代码 main.cpp

#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<getopt.h>
#include <pthread.h>
 
 
 
#include"my_atomic.h"
intthread_num=100;
 
typedef struct_Count
{
        unsigned int m;
        unsigned int n;
} Count;
typedef struct_Thread
{
        pthread_t thread_id;
        Count count;
} Thread;
 
Thread* thread;
 
 
void*atomic_start( void* count);
 
my_atmic y(0);
 
int z = 0;
int main(intargc, char** argv)
{
 
 thread=(Thread*)malloc(thread_num*sizeof(Thread));
       memset(thread,0,thread_num*sizeof(Thread));
 
        int i;
        for(i=0;i<thread_num;i++)
        {
               if(pthread_create(&((thread+i)->thread_id),NULL,atomic_start,&((thread+i)->count)))
                {
                        printf("threadcreate faild: %d",i--);
                }
        }
 
        for(int i=0;i<100;i++)
        {
               printf("%d
",y.m_coun);
                usleep(1);
        }
 
        return 0;
}
 
 
 
void*atomic_start( void* count)
{
        pthread_t thread_id=pthread_self();
        pthread_detach(thread_id);
 
        int i;
        for(i=1000;i>0;i--)
        {
            //atomic_inc(&x);
            //y++;
            //y--;
            //++y;
            //--y;
            y+=2;
            //++z;
        }
        return NULL;
}


WINDOWS

         Windows下也提供了原子操作。在网上我找到一篇讲的很好的文章,就不再罗说了。

直接引用过来。

         下面转自http://blog.csdn.net/asce1885/article/details/5729912

所谓原子訪问,指的是一个线程在訪问某个资源的同一时候可以保证没有其它线程会在同一时刻訪问同一资源。

Interlocked系列函数提供了这种操作。全部这些函数会以原子方式来操控一个值。

Interlocked函数的工作原理取决于代码执行的CPU平台,假设是x86系列CPU。那么Interlocked函数会在总线上维持一个硬件信号,这个信号会阻止其它CPU訪问同一个内存地址。我们必须确保传给这些函数的变量地址是经过对齐的。否则这些函数可能会失败。

C执行库提供了一个_aligned_malloc函数,我们能够使用这个函数来分配一块对齐过的内存:

void * _aligned_malloc(

   size_t size,  //要分配的字节数

   size_t alignment //要对齐到的字节边界,传给alignment的值必须是2的整数幂次方

);

Interlocked函数的还有一个须要注意的点是它们运行得非常快。

调用一次Interlocked函数通常仅仅占用几个CPU周期(通常小于50)。并且不须要在用户模式和内核模式之间进行切换(这个切换通常须要占用1000个CPU周期以上)。

1)原子加减操作InterlockedExchangeAdd函数原型例如以下:

LONG __cdecl InterlockedExchangeAdd( //对32位值进行操作

 __inout  LONG volatile *Addend, //须要递增的变量地址

 __in     LONG Value //增量值,可为负值表示减法

);

LONGLONG __cdecl InterlockedExchangeAdd64(//对64位值进行操作

 __inout  LONGLONG volatile*Addend,

 __in     LONGLONG Value

);

2)InterlockedExchange函数用于原子地将32位整数设为指定的值:

LONG __cdecl InterlockedExchange(

 __inout  LONG volatile *Target, //指向要替换的32位值的指针

 __in     LONG Value //替换的值

);

返回值是指向原先的32位整数值。

InterlockedExchangePointer函数原子地用于替换地址值:

PVOID __cdecl InterlockedExchangePointer(

 __inout  PVOID volatile *Target,//指向要替换的地址值的指针

 __in     PVOID Value //替换的地址值

);

返回值是原来的地址值。

对32位应用程序来说,以上两个函数都用一个32位值替换还有一个32位值。但对64位应用程序来说,InterlockedExchange替换的是32位值,而InterlockedExchangePointer替换的是64位值。

当然,另一个函数InterlockedExchange64专门用来原子地操作64位值的:

LONGLONG __cdecl InterlockedExchange64(

 __inout  LONGLONG volatile*Target,

 __in     LONGLONG Value

);

在实现旋转锁时。InterlockedExchange函数极事实上用:

//标识一个共享资源是否正在被使用的全局变量

BOOL g_fResourceInUse = FALSE;

...

void ASCEFunc()

{

        //等待訪问共享资源

        while(InterlockedExchange(&g_fResourceInUse, TRUE) == TRUE)

                   sleep(0);

        //訪问共享资源

        ...

        //结束訪问

        InterlockedExchange(&g_fResourceInUse, FALSE);

}

注意。在使用这项技术时要小心。由于旋转锁会耗费CPU时间。特别是在单CPU机器上应该避免使用旋转锁。假设一个线程不停地循环。那么这会浪费宝贵的CPU时间,并且会阻止其它线程改变该锁的值。

3)函数InterlockedCompareExchange函数和InterlockedCompareExchangePointer函数原型例如以下:

LONG __cdecl InterlockedCompareExchange(

 __inout  LONG volatile*Destination, //当前值

 __in     LONG Exchange, //

 __in     LONG Comparand //比較值

);

PVOID __cdeclInterlockedCompareExchangePointer(

 __inout  PVOID volatile*Destination,

 __in     PVOID Exchange,

 __in     PVOID Comparand

);

这两个函数以原子方式运行一个測试和设置操作。

对32位应用程序来说。这两个函数都对32位值进行操作。在64位应用程序中。InterlockedCompareExchange对32位值进行操作而InterlockedCompareExchangePointer对64位值进行操作。函数会将当前值(Destination指向的)与參数Comparand进行比較,假设两个值同样。那么函数会将*Destination改动为Exchange參数指定的值。若不等。则*Destination保持不变。

函数会返回*Destination原来的值。全部这些操作都是一个原子运行单元来完毕的。

当然,这两个函数的64位版本号是:

LONGLONG __cdeclInterlockedCompareExchange64(

 __inout  LONGLONG volatile*Destination,

 __in     LONGLONG Exchange,

 __in     LONGLONG Comparand

);

4)Interlocked单向链表函数

InitializeSListHead函数用于创建一个空的单向链表栈:

void WINAPI InitializeSListHead(

 __inout  PSLIST_HEADER ListHead

);

InterlockedPushEntrySList函数在栈顶加入一个元素:

PSLIST_ENTRY WINAPIInterlockedPushEntrySList(

 __inout  PSLIST_HEADER ListHead,

 __inout  PSLIST_ENTRY ListEntry

);

InterlockedPopEntrySList函数移除位于栈顶的元素并将其返回:

PSLIST_ENTRY WINAPIInterlockedPopEntrySList(

 __inout  PSLIST_HEADER ListHead

);

InterlockedFlushSList函数用于清空单向链表栈:

PSLIST_ENTRY WINAPI InterlockedFlushSList(

 __inout  PSLIST_HEADER ListHead

);

QueryDepthSList函数用于返回栈中元素的数量:

USHORT WINAPI QueryDepthSList(

 __in  PSLIST_HEADER ListHead

);

单向链表栈中元素的结构是:

typedef struct _SLIST_ENTRY {

 struct _SLIST_ENTRY *Next;

} SLIST_ENTRY, *PSLIST_ENTRY;

注意:全部单向链表栈中的元素必须以MEMORY_ALLOCATION_ALIGNMENT方式对齐,使用_aligned_malloc函数就可以。

实比例如以下:

#include <windows.h>

#include <malloc.h>

#include <stdio.h>

// Structure to be used for a list item;the first member is the

// SLIST_ENTRY structure, and additionalmembers are used for data.

// Here, the data is simply a signature fortesting purposes.

typedef struct _PROGRAM_ITEM {

   SLIST_ENTRY ItemEntry;

   ULONG Signature;

} PROGRAM_ITEM, *PPROGRAM_ITEM;

int main( )

{

   ULONG Count;

   PSLIST_ENTRY pFirstEntry, pListEntry;

   PSLIST_HEADER pListHead;

   PPROGRAM_ITEM pProgramItem;

   // Initialize the list header to a MEMORY_ALLOCATION_ALIGNMENT boundary.

   pListHead = (PSLIST_HEADER)_aligned_malloc(sizeof(SLIST_HEADER),

      MEMORY_ALLOCATION_ALIGNMENT);

   if( NULL == pListHead )

    {

       printf("Memory allocation failed./n");

       return -1;

    }

   InitializeSListHead(pListHead);

   // Insert 10 items into the list.

   for( Count = 1; Count <= 10; Count += 1 )

    {

       pProgramItem = (PPROGRAM_ITEM)_aligned_malloc(sizeof(PROGRAM_ITEM),

           MEMORY_ALLOCATION_ALIGNMENT);

       if( NULL == pProgramItem )

       {

           printf("Memory allocation failed./n");

           return -1;

        }

       pProgramItem->Signature = Count;

       pFirstEntry = InterlockedPushEntrySList(pListHead,

                      &(pProgramItem->ItemEntry));

    }

   // Remove 10 items from the list and display the signature.

   for( Count = 10; Count >= 1; Count -= 1 )

    {

       pListEntry = InterlockedPopEntrySList(pListHead);

       if( NULL == pListEntry )

       {

           printf("List is empty./n");

           return -1;

       }

       pProgramItem = (PPROGRAM_ITEM)pListEntry;

       printf("Signature is %d/n", pProgramItem->Signature);

   // This example assumes that the SLIST_ENTRY structure is the

   // first member of the structure. If your structure does not

   // follow this convention, you must compute the starting address

   // of the structure before calling the free function.

       _aligned_free(pListEntry);

    }

   // Flush the list and verify that the items are gone.

   pListEntry = InterlockedFlushSList(pListHead);

   pFirstEntry = InterlockedPopEntrySList(pListHead);

   if (pFirstEntry != NULL)

    {

       printf("Error: List is not empty./n");

       return -1;

    }

   _aligned_free(pListHead);

   return 1;

}

  以上转自http://blog.csdn.net/asce1885/article/details/5729912

C++11

多线程编程常常须要操作共享的内存,在读/写过程中会导致竞争条件。

比如:

int  counter = 0;

............

++counter;  //由于++counter不时原子操作,多个线程中出现此操作时不是线程安全的。

应该用:

atomic<int>   counter(0);  //等效于atomic_int   counter(0);

............

++counter;  //此时多个线程运行++counter是一个原子操作,是线程安全的。

例:

void  func( std::atomic<int>&  counter)

{  

for( int   i=0;  i<1000;   ++i )    

++counter;

}int  main()

{  std::atomic<int>   counter(0);  

std::vector<std::thread>  threads;  

for( int i=0;   i<10;   ++i )    //线程參数总是值传递,若要传递引用。须加std::ref()。(头文件<functional>中)    

threads.push_back( std::thread{ func, std::ref(counter)} );  

for( auto&   t  : threads )    

t.join();    //调用join,假设线程未结束,则main函数堵塞于此。

  std::count<<"Result="<<counter<<std::endl;  

return 0;

}/*join的调用会导致调用线程堵塞于此,若不希望调用线程堵塞,但又想知道被调线程是否结束,应当用其他方式,比如消息...*/

原文地址:https://www.cnblogs.com/gcczhongduan/p/5122058.html