多线程下队列的优化

0. 前言

  前段时间在看无锁队列相关的东西时发现了一篇关于加锁队列算法优化的文章,故看了下其原理以及使用C实现了该队列。该队列在Java中类LinkedBlockingQueue以及实现了该功能。

  相关文章:Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms

       多线程队列的算法优化

1. 基本原理

  对于多线程下,多生产者以及多消费者的环境下,可以通过在队首以及队尾分开加锁进行优化。初始时使用一个空节点,在向空队列插入元素或者弹出最后一个元素节点时,可以不需要获取两边的锁(因为需要同时修改tail和head指针,但是加了空节点则可以阻止这种情况的发生)。

2. C简单实现

#include <pthread.h>
#include <stdlib.h>
#include <assert.h>

typedef struct QNode
{
    void *         pValue;
    struct QNode *    pNext;
} QNode;

typedef struct Queue
{
    QNode * pHead;
    QNode * pTail;
    pthread_mutex_t headLock;
    pthread_mutex_t tailLock;
} TQueue;


TQueue * InitTQueue()
{
    TQueue * pQueue = malloc(sizeof(*pQueue));
    assert(pQueue);

    QNode *pEmptyNode = malloc(sizeof(*pEmptyNode));
    assert(pEmptyNode);

    pEmptyNode->pNext = NULL;
    pQueue->pHead = pQueue->pTail = pEmptyNode;
    pthread_mutex_init(&pQueue->headLock, NULL);
    pthread_mutex_init(&pQueue->tailLock, NULL);

    return pQueue;
}


void EnQueue(TQueue *pQueue, void *pValue)
{
    assert(pQueue);

    QNode *pNewNode = malloc(sizeof(*pNewNode));
    pNewNode->pValue = pValue;
    pNewNode->pNext = NULL;

    pthread_mutex_lock(&pQueue->tailLock);

    pQueue->pTail->pNext = pNewNode;
    pQueue->pTail = pNewNode;

    pthread_mutex_unlock(&pQueue->tailLock);
}


void * DeQueue(TQueue *pQueue)
{
    void *pValue = NULL;

    pthread_mutex_lock(&pQueue->headLock);

    QNode *pPopNode = pQueue->pHead;
    if (NULL == pPopNode->pNext)
    {
        pthread_mutex_unlock(&pQueue->headLock);
        return NULL;
    }
    pValue = pPopNode->pNext->pValue;
    pQueue->pHead = pPopNode->pNext;

    pthread_mutex_unlock(&pQueue->headLock);

    free(pPopNode);
    return pValue;
}

void * Producer(void *pArg)
{
    TQueue * pQueue = (TQueue*)pArg;

    const int nMsgNums = 10;

    for (int i = 1; i <= nMsgNums; ++i)
    {
        EnQueue(pQueue, (void*)i);
    }
    EnQueue(pQueue, 0);
}

void * Consumer(void *pArg)
{
    TQueue * pQueue = (TQueue*)pArg;

    while (1)
    {
        int nMsg = (int)DeQueue(pQueue);
        if (nMsg == 0)
        {
            break;
        }
        printf("%u msg[%d]
", pthread_self(), nMsg);
    }
}

#define MAX_PTHREADS    3

int main(int argc, char const *argv[])
{
    TQueue * pQueue = InitTQueue();

    assert(pQueue);

    pthread_t producerId[MAX_PTHREADS];
    pthread_t consumerId[MAX_PTHREADS];

    for (int i = 0; i < MAX_PTHREADS; ++i)
    {
        pthread_create(&producerId[i], NULL, Producer, (void*)pQueue);
        pthread_create(&consumerId[i], NULL, Consumer, (void*)pQueue);
    }

    for (int i = 0; i < MAX_PTHREADS; ++i)
    {
        pthread_join(producerId[i], NULL);
        pthread_join(consumerId[i], NULL);
    }

    return 0;
}
原文地址:https://www.cnblogs.com/jabnih/p/5067770.html