rte_atomic32_cmpset

CAS
学习无锁队列前先看一个基本概念,CAS原子指令操作。

CAS(Compare and Swap,比较并替换)原子指令,用来保障数据的一致性。

指令有三个参数,当前内存值V、旧的预期值A、更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false。

在DPDK中封装后的函数如下:

rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head)
&r->prod.head指向当前内存值,*old_head为执行该操作前将r->prod.head存储到临时变量的值,*new_head为即将更新的值。

只有r->prod.head == *old_head才会将r->prod.head更新为*new_head

rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src)
{
        unsigned int ret = 0;

        asm volatile(
                        "	lwsync
"
                        "1:	lwarx %[ret], 0, %[dst]
"
                        "cmplw %[exp], %[ret]
"
                        "bne 2f
"
                        "stwcx. %[src], 0, %[dst]
"
                        "bne- 1b
"
                        "li %[ret], 1
"
                        "b 3f
"
                        "2:
"
                        "stwcx. %[ret], 0, %[dst]
"
                        "li %[ret], 0
"
                        "3:
"
                        "isync
"
                        : [ret] "=&r" (ret), "=m" (*dst)
                        : [dst] "r" (dst),
                          [exp] "r" (exp),
                          [src] "r" (src),
                          "m" (*dst)
                        : "cc", "memory");

        return ret;
}
static __rte_always_inline void
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
        uint32_t single, uint32_t enqueue)
{
    //1.内存屏障
    if (enqueue)
        rte_smp_wmb();
    else
        rte_smp_rmb();
    //2.如果有其他生产者生产数据,那么需要等待其将数据生产完更新tail指针后,本生产者才能更新tail指针
    if (!single)
        while (unlikely(ht->tail != old_val))
            rte_pause();
    //3.更新tail指针,更新的位置为最新的生产位置,意味着刚刚生产的数据已经全部可以被消费者消费
    ht->tail = new_val;
}
static __rte_always_inline int
rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
{
        return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
}
 *
 * @param r
 *   A pointer to the ring structure.
 * @param obj_table
 *   A pointer to a table of void * pointers (objects).
 * @param n
 *   The number of objects to add in the ring from the obj_table.
 * @param free_space
 *   if non-NULL, returns the amount of space in the ring after the
 *   enqueue operation has finished.
 * @return
 *   The number of objects enqueued, either 0 or n
 */
static __rte_always_inline unsigned int
rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
                         unsigned int n, unsigned int *free_space)
{
        return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
                        __IS_MP, free_space);
}

  static __rte_always_inline unsigned int
    __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
            unsigned int n, enum rte_ring_queue_behavior behavior,
            uint32_t *old_head, uint32_t *new_head,
            uint32_t *free_entries)
    {
        const uint32_t capacity = r->capacity;
        uint32_t cons_tail;
        unsigned int max = n;
        int success;
    
        *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
        do {
            /* Reset n to the initial burst count */
            n = max;
    
            /* Ensure the head is read before tail */
            __atomic_thread_fence(__ATOMIC_ACQUIRE);
    
            /* load-acquire synchronize with store-release of ht->tail
             * in update_tail.
             */
            cons_tail = __atomic_load_n(&r->cons.tail,
                        __ATOMIC_ACQUIRE);
    
            /* The subtraction is done between two unsigned 32bits value
             * (the result is always modulo 32 bits even if we have
             * *old_head > cons_tail). So 'free_entries' is always between 0
             * and capacity (which is < size).
             */
            *free_entries = (capacity + cons_tail - *old_head);
    
            /* check that we have enough room in ring */
            if (unlikely(n > *free_entries))
                n = (behavior == RTE_RING_QUEUE_FIXED) ?
                        0 : *free_entries;
    
            if (n == 0)
                return 0;
    
            *new_head = *old_head + n;
            if (is_sp)
                r->prod.head = *new_head, success = 1;
            else
                /* on failure, *old_head is updated */
                success = __atomic_compare_exchange_n(&r->prod.head,
                        old_head, *new_head,
                        0, __ATOMIC_RELAXED,
                        __ATOMIC_RELAXED);
        } while (unlikely(success == 0));
        return n;
    }

参考;

https://www.sunxidong.com/576.html

原文地址:https://www.cnblogs.com/dream397/p/13646492.html