CAS
学习无锁队列前先看一个基本概念,CAS原子指令操作。
CAS(Compare and Swap,比较并替换)原子指令,用来保障数据的一致性。
指令有三个参数,当前内存值V、旧的预期值A、更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false。
在DPDK中封装后的函数如下:
rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head)
&r->prod.head指向当前内存值,*old_head为执行该操作前将r->prod.head存储到临时变量的值,*new_head为即将更新的值。
只有r->prod.head == *old_head才会将r->prod.head更新为*new_head
rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src) { unsigned int ret = 0; asm volatile( " lwsync " "1: lwarx %[ret], 0, %[dst] " "cmplw %[exp], %[ret] " "bne 2f " "stwcx. %[src], 0, %[dst] " "bne- 1b " "li %[ret], 1 " "b 3f " "2: " "stwcx. %[ret], 0, %[dst] " "li %[ret], 0 " "3: " "isync " : [ret] "=&r" (ret), "=m" (*dst) : [dst] "r" (dst), [exp] "r" (exp), [src] "r" (src), "m" (*dst) : "cc", "memory"); return ret; }
static __rte_always_inline void update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, uint32_t single, uint32_t enqueue) { //1.内存屏障 if (enqueue) rte_smp_wmb(); else rte_smp_rmb(); //2.如果有其他生产者生产数据,那么需要等待其将数据生产完更新tail指针后,本生产者才能更新tail指针 if (!single) while (unlikely(ht->tail != old_val)) rte_pause(); //3.更新tail指针,更新的位置为最新的生产位置,意味着刚刚生产的数据已经全部可以被消费者消费 ht->tail = new_val; }
static __rte_always_inline int rte_ring_mp_enqueue(struct rte_ring *r, void *obj) { return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; }
* * @param r * A pointer to the ring structure. * @param obj_table * A pointer to a table of void * pointers (objects). * @param n * The number of objects to add in the ring from the obj_table. * @param free_space * if non-NULL, returns the amount of space in the ring after the * enqueue operation has finished. * @return * The number of objects enqueued, either 0 or n */ static __rte_always_inline unsigned int rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, __IS_MP, free_space); }
static __rte_always_inline unsigned int __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *free_entries) { const uint32_t capacity = r->capacity; uint32_t cons_tail; unsigned int max = n; int success; *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED); do { /* Reset n to the initial burst count */ n = max; /* Ensure the head is read before tail */ __atomic_thread_fence(__ATOMIC_ACQUIRE); /* load-acquire synchronize with store-release of ht->tail * in update_tail. */ cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE); /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * *old_head > cons_tail). So 'free_entries' is always between 0 * and capacity (which is < size). */ *free_entries = (capacity + cons_tail - *old_head); /* check that we have enough room in ring */ if (unlikely(n > *free_entries)) n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *free_entries; if (n == 0) return 0; *new_head = *old_head + n; if (is_sp) r->prod.head = *new_head, success = 1; else /* on failure, *old_head is updated */ success = __atomic_compare_exchange_n(&r->prod.head, old_head, *new_head, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED); } while (unlikely(success == 0)); return n; }
参考;