rte_vhost_enqueue_burst + rte_vhost_dequeue_burst + vhost_user_set_vring_call +vhost_user_set_vring_call

下图显示了作为DPDK-APP的一部分运行的虚拟主机用户库如何使用virtio-device-model和virtio-pci设备与qemu和客户机进行交互:

几个要点:

  • virtio内存区域最初是由客户机分配的。
  • 相应的virtio驱动程序通常通过virtio规范中定义的PCI BARs配置接口与virtio设备进行交互。
  • virtio-device-model(位于QEMU内部)使用vhost-user协议配置vhost-user库,以及设置irqfd和ioeventfd文件描述符。
  • 客户机分配的virtio内存区域由vhost用户库(即DPDK应用程序)映射(使用mmap 系统调用)。
  • 结果是,DPDK应用程序可以直接在客户机内存中读取和写入数据包,并使用irqfd和ioeventfd机制直接对客户机发出通知。

rte_vhost_enqueue_burst通知虚拟机eventfd_write

static inline uint32_t __attribute__((always_inline))
virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
    struct rte_mbuf **pkts, uint32_t count)
{
    struct vhost_virtqueue *vq;
    struct vring_desc *desc;
    struct rte_mbuf *buff;
    /* The virtio_hdr is initialised to 0. */
    struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
    uint64_t buff_addr = 0;
    uint64_t buff_hdr_addr = 0;
    uint32_t head[MAX_PKT_BURST];
    uint32_t head_idx, packet_success = 0;
    uint16_t avail_idx, res_cur_idx;
    uint16_t res_base_idx, res_end_idx;
    uint16_t free_entries;
    uint8_t success = 0;

    LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_rx()
", dev->device_fh);
    if (unlikely(queue_id != VIRTIO_RXQ)) {
        LOG_DEBUG(VHOST_DATA, "mq isn't supported in this version.
");
        return 0;
    }

    vq = dev->virtqueue[VIRTIO_RXQ];
    count = (count > MAX_PKT_BURST) ? MAX_PKT_BURST : count;

    /*
     * As many data cores may want access to available buffers,
     * they need to be reserved.
     */
    do {
        res_base_idx = vq->last_used_idx_res;
        avail_idx = *((volatile uint16_t *)&vq->avail->idx);

        free_entries = (avail_idx - res_base_idx);
        /*check that we have enough buffers*/
        if (unlikely(count > free_entries))
            count = free_entries;

        if (count == 0)
            return 0;

        res_end_idx = res_base_idx + count;
        /* vq->last_used_idx_res is atomically updated. */
        /* TODO: Allow to disable cmpset if no concurrency in application. */
        success = rte_atomic16_cmpset(&vq->last_used_idx_res,
                res_base_idx, res_end_idx);
    } while (unlikely(success == 0));
    res_cur_idx = res_base_idx;
    LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| End Index %d
",
            dev->device_fh, res_cur_idx, res_end_idx);

    /* Prefetch available ring to retrieve indexes. */
    rte_prefetch0(&vq->avail->ring[res_cur_idx & (vq->size - 1)]);

    /* Retrieve all of the head indexes first to avoid caching issues. */
    for (head_idx = 0; head_idx < count; head_idx++)
        head[head_idx] = vq->avail->ring[(res_cur_idx + head_idx) &
                    (vq->size - 1)];

    /*Prefetch descriptor index. */
    rte_prefetch0(&vq->desc[head[packet_success]]);

    while (res_cur_idx != res_end_idx) {
        uint32_t offset = 0, vb_offset = 0;
        uint32_t pkt_len, len_to_cpy, data_len, total_copied = 0;
        uint8_t hdr = 0, uncompleted_pkt = 0;

        /* Get descriptor from available ring */
        desc = &vq->desc[head[packet_success]];

        buff = pkts[packet_success];

        /* Convert from gpa to vva (guest physical addr -> vhost virtual addr) */
        buff_addr = gpa_to_vva(dev, desc->addr);
        /* Prefetch buffer address. */
        rte_prefetch0((void *)(uintptr_t)buff_addr);

        /* Copy virtio_hdr to packet and increment buffer address */
        buff_hdr_addr = buff_addr;

        /*
         * If the descriptors are chained the header and data are
         * placed in separate buffers.
         */
        if ((desc->flags & VRING_DESC_F_NEXT) &&
            (desc->len == vq->vhost_hlen)) {
            desc = &vq->desc[desc->next];
            /* Buffer address translation. */
            buff_addr = gpa_to_vva(dev, desc->addr);
        } else {
            vb_offset += vq->vhost_hlen;
            hdr = 1;
        }

        pkt_len = rte_pktmbuf_pkt_len(buff);
        data_len = rte_pktmbuf_data_len(buff);
        len_to_cpy = RTE_MIN(data_len,
            hdr ? desc->len - vq->vhost_hlen : desc->len);
        while (total_copied < pkt_len) {
            /* Copy mbuf data to buffer */
            rte_memcpy((void *)(uintptr_t)(buff_addr + vb_offset),
                rte_pktmbuf_mtod_offset(buff, const void *, offset),
                len_to_cpy);
            PRINT_PACKET(dev, (uintptr_t)(buff_addr + vb_offset),
                len_to_cpy, 0);

            offset += len_to_cpy;
            vb_offset += len_to_cpy;
            total_copied += len_to_cpy;

            /* The whole packet completes */
            if (total_copied == pkt_len)
                break;

            /* The current segment completes */
            if (offset == data_len) {
                buff = buff->next;
                offset = 0;
                data_len = rte_pktmbuf_data_len(buff);
            }

            /* The current vring descriptor done */
            if (vb_offset == desc->len) {
                if (desc->flags & VRING_DESC_F_NEXT) {
                    desc = &vq->desc[desc->next];
                    buff_addr = gpa_to_vva(dev, desc->addr);
                    vb_offset = 0;
                } else {
                    /* Room in vring buffer is not enough */
                    uncompleted_pkt = 1;
                    break;
                }
            }
            len_to_cpy = RTE_MIN(data_len - offset, desc->len - vb_offset);
        };

        /* Update used ring with desc information */
        vq->used->ring[res_cur_idx & (vq->size - 1)].id =
                            head[packet_success];

        /* Drop the packet if it is uncompleted */
        if (unlikely(uncompleted_pkt == 1))
            vq->used->ring[res_cur_idx & (vq->size - 1)].len =
                            vq->vhost_hlen;
        else
            vq->used->ring[res_cur_idx & (vq->size - 1)].len =
                            pkt_len + vq->vhost_hlen;

        res_cur_idx++;
        packet_success++;

        if (unlikely(uncompleted_pkt == 1))
            continue;

        rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
            (const void *)&virtio_hdr, vq->vhost_hlen);

        PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, vq->vhost_hlen, 1);

        if (res_cur_idx < res_end_idx) {
            /* Prefetch descriptor index. */
            rte_prefetch0(&vq->desc[head[packet_success]]);
        }
    }

    rte_compiler_barrier();

    /* Wait until it's our turn to add our buffer to the used ring. */
    while (unlikely(vq->last_used_idx != res_base_idx))
        rte_pause();

    *(volatile uint16_t *)&vq->used->idx += count;
    vq->last_used_idx = res_end_idx;

    /* flush used->idx update before we read avail->flags. */
    rte_mb();

    /* Kick the guest if necessary. */
    if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
        eventfd_write((int)vq->callfd, 1);
    return count;
}

eth_vhost_install_intr(struct rte_eth_dev *dev)

  for (i = 0; i < nb_rxq; i++) {
                vq = dev->data->rx_queues[i];
                if (!vq) {
                        VHOST_LOG(INFO, "rxq-%d not setup yet, skip!
", i);
                        continue;
                }

                ret = rte_vhost_get_vhost_vring(vq->vid, (i << 1) + 1, &vring);
                if (ret < 0) {
                        VHOST_LOG(INFO,
                                "Failed to get rxq-%d's vring, skip!
", i);
                        continue;
                }

                if (vring.kickfd < 0) {
                        VHOST_LOG(INFO,
                                "rxq-%d's kickfd is invalid, skip!
", i);
                        continue;
                }
                dev->intr_handle->intr_vec[i] = RTE_INTR_VEC_RXTX_OFFSET + i;
                dev->intr_handle->efds[i] = vring.kickfd;
                count++;
                VHOST_LOG(INFO, "Installed intr vec for rxq-%d
", i);
        }
vhost-user端,在vhost_user_set_vring_kick中,关键的一句

vq->kickfd = file.fd;

 

vhost_user_set_vring_call(struct virtio_net **pdev, struct VhostUserMsg *msg,
                        int main_fd __rte_unused)
{
        struct virtio_net *dev = *pdev;
        struct vhost_vring_file file;
        struct vhost_virtqueue *vq;
        int expected_fds;

        expected_fds = (msg->payload.u64 & VHOST_USER_VRING_NOFD_MASK) ? 0 : 1;
        if (validate_msg_fds(msg, expected_fds) != 0)
                return RTE_VHOST_MSG_RESULT_ERR;

        file.index = msg->payload.u64 & VHOST_USER_VRING_IDX_MASK;
        if (msg->payload.u64 & VHOST_USER_VRING_NOFD_MASK)
                file.fd = VIRTIO_INVALID_EVENTFD;
        else
                file.fd = msg->fds[0];
        RTE_LOG(INFO, VHOST_CONFIG,
                "vring call idx:%d file:%d
", file.index, file.fd);

        vq = dev->virtqueue[file.index];
        if (vq->callfd >= 0)
                close(vq->callfd);

        vq->callfd = file.fd;

        return RTE_VHOST_MSG_RESULT_OK;
}

rte_vhost_dequeue_burst通知虚拟机eventfd_write

static inline void __attribute__((always_inline))
update_used_idx(struct virtio_net *dev, struct vhost_virtqueue *vq,
        uint32_t count)
{
    if (unlikely(count == 0))
        return;

    rte_smp_wmb();
    rte_smp_rmb();

    vq->used->idx += count;
    vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
            sizeof(vq->used->idx));

    /* Kick guest if required. */
    if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
            && (vq->callfd >= 0))
        eventfd_write(vq->callfd, (eventfd_t)1);
}
[root@localhost ~]# lsof /tmp/vhost1 
COMMAND     PID USER   FD   TYPE             DEVICE SIZE/OFF   NODE NAME
qemu-syst 78915 root   11u  unix 0xffffa05fcf5ebd00      0t0 793679 /tmp/vhost1
qemu-syst 78915 root   13u  unix 0xffff805fc69da200      0t0 793680 /tmp/vhost1
[root@localhost ~]# lsof /tmp/vhost1 
COMMAND     PID USER   FD   TYPE             DEVICE SIZE/OFF   NODE NAME
qemu-syst 78915 root   11u  unix 0xffffa05fcf5ebd00      0t0 793679 /tmp/vhost1
qemu-syst 78915 root   13u  unix 0xffff805fc69da200      0t0 793680 /tmp/vhost1
[root@localhost ~]# ps -elf | grep switch
4 S root          1      0  0  80   0 -  2654 SyS_ep 12月16 ?      00:00:14 /usr/lib/systemd/systemd --switched-root --system --deserialize 22
1 S root      14326  14325  0  80   0 -   302 poll_s 12月16 ?      00:00:07 ovsdb-server /etc/openvswitch/conf.db --remote=punix:/var/run/openvswitch/db.sock --private-key=db:Open_vSwitch,SSL,private_key --certificate=db:Open_vSwitch,SSL,certificate --bootstrap-ca-cert=db:Open_vSwitch,SSL,ca_cert --no-chdir --log-file=/var/log/openvswitch/ovsdb-server.log --pidfile=/var/run/openvswitch/ovsdb-server.pid --detach --monitor
0 S root      78914  78861  0  80   0 -  1738 do_wai 04:13 pts/1    00:00:00 bash vhost_switch.sh
4 R root      78929  78891 99  80   0 - 8425532 -    04:13 pts/2    00:03:17 build/app/switch -c 3 -m 256
0 S root      79006  78954  0  80   0 -  1731 pipe_w 04:14 pts/3    00:00:00 grep --color=auto switch
[root@localhost ~]# lsof -p  78929 | grep vhost1 --------------------这个进程竟然没有去打开
[root@localhost ~]# lsof -p  78915 | grep vhost1
qemu-syst 78915 root   11u     unix 0xffffa05fcf5ebd00        0t0    793679 /tmp/vhost1
qemu-syst 78915 root   13u     unix 0xffff805fc69da200        0t0    793680 /tmp/vhost1
[root@localhost ~]# ps -elf | grep qemu
2 S root      78915  78914  4  80   0 - 56026 poll_s 04:13 pts/1    00:00:07 qemu-system-aarch64 -name vm2 -nographic -enable-kvm -M virt,usb=off -cpu host -smp 2 -m 4096 -global virtio-blk-device.scsi=off -device virtio-scsi-device,id=scsi -kernel vmlinuz-4.18 --append console=ttyAMA0  root=UUID=6a09973e-e8fd-4a6d-a8c0-1deb9556f477 -initrd initramfs-4.18 -drive file=vhuser-test1.qcow2 -m 2048M -numa node,memdev=mem -mem-prealloc -object memory-backend-file,id=mem,size=2048M,mem-path=/dev/hugepages,share=on -chardev socket,id=char1,path=/tmp/vhost1,server -netdev type=vhost-user,id=mynet1,chardev=char1,vhostforce -device virtio-net-pci,netdev=mynet1,mac=00:00:00:00:00:01,mrg_rxbuf=off
0 S root      79025  78954  0  80   0 -  1730 pipe_w 04:15 pts/3    00:00:00 grep --color=auto qemu
[root@localhost ~]# 
[root@localhost dpdk-19.11]# lsof /tmp/vhost1
COMMAND     PID USER   FD   TYPE             DEVICE SIZE/OFF   NODE NAME
qemu-syst 78523 root   11u  unix 0xffffa03fce575a00      0t0 730594 /tmp/vhost1
qemu-syst 78523 root   13u  unix 0xffff805fc69d9d80      0t0 730595 /tmp/vhost1
[root@localhost dpdk-19.11]# ps | grep qemu
[root@localhost dpdk-19.11]# ps -elf  | grep qemu
2 S root      78523  78522  4  80   0 - 46679 poll_s 04:00 pts/8    00:00:06 qemu-system-aarch64 -name vm2 -nographic -enable-kvm -M virt,usb=off -cpu host -smp 2 -m 4096 -global virtio-blk-device.scsi=off -device virtio-scsi-device,id=scsi -kernel vmlinuz-4.18 --append console=ttyAMA0  root=UUID=6a09973e-e8fd-4a6d-a8c0-1deb9556f477 -initrd initramfs-4.18 -drive file=vhuser-test1.qcow2 -m 2048M -numa node,memdev=mem -mem-prealloc -object memory-backend-file,id=mem,size=2048M,mem-path=/dev/hugepages,share=on -chardev socket,id=char1,path=/tmp/vhost1,server -netdev type=vhost-user,id=mynet1,chardev=char1,vhostforce -device virtio-net-pci,netdev=mynet1,mac=00:00:00:00:00:01,mrg_rxbuf=off
0 S root      78750  13131  0  80   0 -  1729 pipe_w 04:02 pts/5    00:00:00 grep --color=auto qemu
 
[root@localhost dpdk-19.11]# ps -elf  |  grep switch
4 S root          1      0  0  80   0 -  2654 SyS_ep Dec16 ?        00:00:14 /usr/lib/systemd/systemd --switched-root --system --deserialize 22
1 S root      14326  14325  0  80   0 -   302 poll_s Dec16 ?        00:00:07 ovsdb-server /etc/openvswitch/conf.db --remote=punix:/var/run/openvswitch/db.sock --private-key=db:Open_vSwitch,SSL,private_key --certificate=db:Open_vSwitch,SSL,certificate --bootstrap-ca-cert=db:Open_vSwitch,SSL,ca_cert --no-chdir --log-file=/var/log/openvswitch/ovsdb-server.log --pidfile=/var/run/openvswitch/ovsdb-server.pid --detach --monitor
0 S root      78522  13196  0  80   0 -  1738 do_wai 04:00 pts/8    00:00:00 bash vhost_switch.sh
4 R root      78593  13174 99  80   0 - 8440761 -    04:02 pts/7    00:01:11 build/app/switch
0 S root      78761  13131  0  80   0 -  1730 pipe_w 04:02 pts/5    00:00:00 grep --color=auto switch
[root@localhost dpdk-19.11]# 

rte_vhost_enqueue_burst

forward_packet(struct rte_mbuf *mbuf, struct dev_info *s_dev)
{
    struct rte_ether_hdr *pkt_hdr;
    struct dev_info *dev;
    struct rte_mbuf *tbuf;
    int ret;

    /* Get the Ethernet header and find destination output */
    pkt_hdr = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *);
    ret = rte_hash_lookup(Mac_output_map, &pkt_hdr->d_addr);

    /* Broadcast */
    if(ret < 0) {
        TAILQ_FOREACH(dev, &Dev_list, dev_entry) {
                        printf(" forward_packet 
 " );
            if(dev == s_dev)
                continue;

            if(dev->virtual) {
                                 if(unlikely(dev->state == DEVICE_CLOSING))
                                {
                                        continue;
                                }
//不要执行
rte_pktmbuf_clone,virtio_dev_rx_split这些函数会copy



rte_vhost_enqueue_burst(dev
->id, VIRTIO_RXQ, &mbuf, 1); } else { tbuf = rte_pktmbuf_clone(mbuf, Mbuf_pool); ret = rte_eth_tx_burst(dev->id, 0, &tbuf, 1); if(unlikely(ret == 0)) rte_pktmbuf_free(tbuf); } } rte_pktmbuf_free(mbuf); return; } /* Unicast */ dev = Output_table[ret]; if(dev->virtual) { if(unlikely(dev->state != DEVICE_CLOSING)) rte_vhost_enqueue_burst(dev->id, VIRTIO_RXQ, &mbuf, 1); rte_pktmbuf_free(mbuf); } else { ret = rte_eth_tx_burst(dev->id, 0, &mbuf, 1); if(unlikely(ret == 0)) rte_pktmbuf_free(mbuf); } }
rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
        struct rte_mbuf **pkts, uint16_t count)
{
        struct virtio_net *dev = get_device(vid);

        if (!dev)
                return 0;

        if (unlikely(!(dev->flags & VIRTIO_DEV_BUILTIN_VIRTIO_NET))) {
                RTE_LOG(ERR, VHOST_DATA,
                        "(%d) %s: built-in vhost net backend is disabled.
",
                        dev->vid, __func__);
                return 0;
        }

        return virtio_dev_rx(dev, queue_id, pkts, count);
}
virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
        struct rte_mbuf **pkts, uint32_t count)
{
        struct vhost_virtqueue *vq;
        uint32_t nb_tx = 0;

        VHOST_LOG_DEBUG(VHOST_DATA, "(%d) %s
", dev->vid, __func__);
        if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->nr_vring))) {
                RTE_LOG(ERR, VHOST_DATA, "(%d) %s: invalid virtqueue idx %d.
",
                        dev->vid, __func__, queue_id);
                return 0;
        }

        vq = dev->virtqueue[queue_id];

        rte_spinlock_lock(&vq->access_lock);

        if (unlikely(vq->enabled == 0))
                goto out_access_unlock;

        if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
                vhost_user_iotlb_rd_lock(vq);

        if (unlikely(vq->access_ok == 0))
                if (unlikely(vring_translate(dev, vq) < 0))
                        goto out;

        count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
        if (count == 0)
                goto out;

        if (vq_is_packed(dev))
                nb_tx = virtio_dev_rx_packed(dev, vq, pkts, count);
        else
                nb_tx = virtio_dev_rx_split(dev, vq, pkts, count);

out:
        if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
                vhost_user_iotlb_rd_unlock(vq);

out_access_unlock:
        rte_spinlock_unlock(&vq->access_lock);

        return nb_tx;
}
static __rte_noinline uint32_t
virtio_dev_rx_packed(struct virtio_net *dev,
                     struct vhost_virtqueue *vq,
                     struct rte_mbuf **pkts,
                     uint32_t count)
{
        uint32_t pkt_idx = 0;
        uint32_t remained = count;

        do {
                rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);

                if (remained >= PACKED_BATCH_SIZE) {
                        if (!virtio_dev_rx_batch_packed(dev, vq,
                                                        &pkts[pkt_idx])) {
                                pkt_idx += PACKED_BATCH_SIZE;
                                remained -= PACKED_BATCH_SIZE;
                                continue;
                        }
                }

                if (virtio_dev_rx_single_packed(dev, vq, pkts[pkt_idx]))
                        break;
                pkt_idx++;
                remained--;

        } while (pkt_idx < count);

        if (vq->shadow_used_idx) {
                do_data_copy_enqueue(dev, vq);
                vhost_flush_enqueue_shadow_packed(dev, vq);
        }

        if (pkt_idx)
                vhost_vring_call_packed(dev, vq);

        return pkt_idx;
}
static __rte_always_inline uint32_t
virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq,
    struct rte_mbuf **pkts, uint32_t count)
{
    uint32_t pkt_idx = 0;
    uint16_t num_buffers;
    struct buf_vector buf_vec[BUF_VECTOR_MAX];
    uint16_t avail_head;

    rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]);
    avail_head = *((volatile uint16_t *)&vq->avail->idx);

    for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
        uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen;
        uint16_t nr_vec = 0;
        /* 为拷贝当前mbuf后续预留avail desc */
        if (unlikely(reserve_avail_buf_split(dev, vq,
                        pkt_len, buf_vec, &num_buffers,
                        avail_head, &nr_vec) < 0)) {
            vq->shadow_used_idx -= num_buffers;
            break;
        }
        /* 拷贝mbuf到avail desc */
        if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx],
                        buf_vec, nr_vec,
                        num_buffers) < 0) {
            vq->shadow_used_idx -= num_buffers;
            break;
        }
        /* 更新last_avail_idx */
        vq->last_avail_idx += num_buffers;
    }
    /* 小包的批处理拷贝 */
    do_data_copy_enqueue(dev, vq);

    if (likely(vq->shadow_used_idx)) {
        flush_shadow_used_ring_split(dev, vq); /* 更新used ring */
        vhost_vring_call_split(dev, vq); /* 通知前端 */
    }

    return pkt_idx;
}

 debug

[root@localhost LearningSwitch-DPDK]# gdb build/app/switch
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-119.el7
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "aarch64-redhat-linux-gnu".
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>...
Reading symbols from /data1/ovs/LearningSwitch-DPDK/build/app/switch...done.
(gdb) set args -c 3 -m 256
(gdb) b vhost_user_set_vring_kick
Breakpoint 1 at 0x510398: file /data1/dpdk-19.11/lib/librte_vhost/vhost_user.c, line 1795.
(gdb) b vhost_user_set_vring_call
Breakpoint 2 at 0x50fb14: file /data1/dpdk-19.11/lib/librte_vhost/vhost_user.c, line 1563.
(gdb) r
(gdb) bt
#0  vhost_user_set_vring_call (pdev=0xffffbc3ecfa8, msg=0xffffbc3ecd00, main_fd=45) at /data1/dpdk-19.11/lib/librte_vhost/vhost_user.c:1563
#1  0x0000000000511e24 in vhost_user_msg_handler (vid=0, fd=45) at /data1/dpdk-19.11/lib/librte_vhost/vhost_user.c:2689
#2  0x0000000000504eb0 in vhost_user_read_cb (connfd=45, dat=0x15a3de0, remove=0xffffbc3ed064) at /data1/dpdk-19.11/lib/librte_vhost/socket.c:306
#3  0x0000000000502198 in fdset_event_dispatch (arg=0x11f37f8 <vhost_user+8192>) at /data1/dpdk-19.11/lib/librte_vhost/fd_man.c:286
#4  0x00000000005a05a4 in rte_thread_init (arg=0x15a42c0) at /data1/dpdk-19.11/lib/librte_eal/common/eal_common_thread.c:165
#5  0x0000ffffbe617d38 in start_thread (arg=0xffffbc3ed910) at pthread_create.c:309
#6  0x0000ffffbe55f5f0 in thread_start () at ../sysdeps/unix/sysv/linux/aarch64/clone.S:91
(gdb) 
Breakpoint 2, vhost_user_set_vring_call (pdev=0xffffbc3ecfa8, msg=0xffffbc3ecd00, main_fd=45) at /data1/dpdk-19.11/lib/librte_vhost/vhost_user.c:1563
1563            struct virtio_net *dev = *pdev;
(gdb) bt
#0  vhost_user_set_vring_call (pdev=0xffffbc3ecfa8, msg=0xffffbc3ecd00, main_fd=45) at /data1/dpdk-19.11/lib/librte_vhost/vhost_user.c:1563
#1  0x0000000000511e24 in vhost_user_msg_handler (vid=0, fd=45) at /data1/dpdk-19.11/lib/librte_vhost/vhost_user.c:2689
#2  0x0000000000504eb0 in vhost_user_read_cb (connfd=45, dat=0x15a3de0, remove=0xffffbc3ed064) at /data1/dpdk-19.11/lib/librte_vhost/socket.c:306
#3  0x0000000000502198 in fdset_event_dispatch (arg=0x11f37f8 <vhost_user+8192>) at /data1/dpdk-19.11/lib/librte_vhost/fd_man.c:286
#4  0x00000000005a05a4 in rte_thread_init (arg=0x15a42c0) at /data1/dpdk-19.11/lib/librte_eal/common/eal_common_thread.c:165
#5  0x0000ffffbe617d38 in start_thread (arg=0xffffbc3ed910) at pthread_create.c:309
#6  0x0000ffffbe55f5f0 in thread_start () at ../sysdeps/unix/sysv/linux/aarch64/clone.S:91
(gdb) 
Breakpoint 1, vhost_user_set_vring_kick (pdev=0xffffbc3ecfa8, msg=0xffffbc3ecd00, main_fd=45) at /data1/dpdk-19.11/lib/librte_vhost/vhost_user.c:1795
1795            struct virtio_net *dev = *pdev;
(gdb) bt
#0  vhost_user_set_vring_kick (pdev=0xffffbc3ecfa8, msg=0xffffbc3ecd00, main_fd=45) at /data1/dpdk-19.11/lib/librte_vhost/vhost_user.c:1795
#1  0x0000000000511e24 in vhost_user_msg_handler (vid=0, fd=45) at /data1/dpdk-19.11/lib/librte_vhost/vhost_user.c:2689
#2  0x0000000000504eb0 in vhost_user_read_cb (connfd=45, dat=0x15a3de0, remove=0xffffbc3ed064) at /data1/dpdk-19.11/lib/librte_vhost/socket.c:306
#3  0x0000000000502198 in fdset_event_dispatch (arg=0x11f37f8 <vhost_user+8192>) at /data1/dpdk-19.11/lib/librte_vhost/fd_man.c:286
#4  0x00000000005a05a4 in rte_thread_init (arg=0x15a42c0) at /data1/dpdk-19.11/lib/librte_eal/common/eal_common_thread.c:165
#5  0x0000ffffbe617d38 in start_thread (arg=0xffffbc3ed910) at pthread_create.c:309
#6  0x0000ffffbe55f5f0 in thread_start () at ../sysdeps/unix/sysv/linux/aarch64/clone.S:91
(gdb) 

设置共享内存

在虚拟机中,内存是由QEMU进程提前分配好的。QEMU一旦选择了使用vhost-user的方式进行网络通信,就需要配置VM的内存访问方式为共享的,具体的命令行参数在DPDK的文档中也有说明:

-object memory-backend-file,share=on,...

这意味着虚拟机的内存必须是预先分配好的大页面且允许共享给其他进程,具体原因在前一篇文章讲过,因为OVS和QEMU都是用户态进程,而数据包拷贝过程中,需要OVS进程里拥有访问QEMU进程里的虚拟机buffer的能力,所以VM内存必须被共享给OVS进程。

一些节约内存的方案,像virtio_balloon这样动态改变VM内存的方法不再可用,原因也很简单,OVS不可能一直跟着虚拟机不断改变共享内存啊,这样多费事。

而在vhost库中,后端驱动接收控制信道来的消息,主动映射VM内存的代码如下:

static int vhost_user_set_mem_table(struct virtio_net *dev, struct VhostUserMsg *pmsg)
{
   ...
   mmap_size = RTE_ALIGN_CEIL(mmap_size, alignment);

   mmap_addr = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
                 MAP_SHARED | MAP_POPULATE, fd, 0);
   ...
   reg->mmap_addr = mmap_addr;
   reg->mmap_size = mmap_size;
   reg->host_user_addr = (uint64_t)(uintptr_t)mmap_addr +
                      mmap_offset;
   ...
}

它使用的linux库函数mmap()来映射VM内存,详见linux编程手册http://man7.org/linux/man-pages/man2/mmap.2.html。注意到在映射内存之前它还干了一件事,设置内存对齐,这是因为mmap函数映射内存的基本单位是一个页,也就是说起始地址和大小都必须是页大小是整数倍,在大页面环境下,就是2MB或者1GB。只有对齐以后才能保证映射共享内存不出错,以及后续访存行为不会越界。

后面三行是保存地址转换关系的信息。这里涉及到几种地址转换,在vhost-user中最复杂的就是地址转换。从QEMU进程角度看虚拟机内存有两种地址:GPA(Guest Physical Address)和QVA(QEMU Virtual Address);从OVS进程看虚拟机内存也有两种地址GPA(Guest Physical Address)和VVA(vSwitch Virtual Address)

GPA是virtio最重要的地址,在virtio的标准中,用来存储数据包地址的虚拟队列virtqueue里面每项都是以GPA表示的。但是对于操作系统而言,我们在进程内访问实际使用的都是虚拟地址,物理地址已经被屏蔽,也就是说进程只有拿到了物理地址所对应的虚拟地址才能够去访存(我们编程使用的指针都是虚拟地址)。

QEMU进程容易实现,毕竟作为VM主进程给VM预分配内存时就建立了QVA到GPA的映射关系。


 
                                         共享内存映射关系 

对于OVS进程,以上图为例,mmap()函数返回的也是虚拟地址,是VM内存映射到OVS地址空间的起始地址(就是说我把这一块内存映射在了以mmap_addr起始的大小为1GB的空间里)。这样给OVS进程一个GPA,OVS进程可以利用地址偏移算出对应的VVA,然后实施访存。

但是实际映射内存比图例复杂的多,可能VM内存被拆分成了几块,有些块起始的GPA不为0,这就需要在映射块中再加一个GPA偏移量,才能完整保留下来VVA与GPA之间的地址对应关系。这些对应关系是后续数据通路实现的基础。

还有一点技术细节值得注意的是,在socket中,一般是不可以直接传递文件描述符的,文件描述符在编程角度就是一个int型变量,直接传过去就是一个整数,这里也是做了一点技术上的trick,感兴趣的话也可以研究下


 

设置虚拟队列信息

虚拟队列的结构由三部分构成:avail ring、used ring和desc ring。这里稍微详细说明一下这三个ring的作用与设计思想。

传统网卡设计中只有一个环表,但是有两个指针,分别为驱动和网卡设备管理的。这就是一个典型的生产者-消费者问题,生产数据包的一方移动指针,另一方追逐,直到全部消费完。但是这么做的缺点在于,只能顺序执行,前一个描述符处理完之前,后一个只能等待。

但在虚拟队列中,把生产者和消费者分离开来。desc ring中存放的是真实的数据包描述符(就是数据包或其缓冲区地址),avail ring和used ring中存放的指向desc ring中项的索引。前端驱动将生产出来的描述符放到avail ring中,后端驱动把已经消费的描述符放到used ring中(其实就是写desc ring中的索引,即序号)。这样前端驱动就可以根据used ring来回收已经使用的描述符,即使中间有描述符被占用,也不会影响被占用描述符之后的描述符的回收。另外DPDK还针对这种结构做了一种cache层面上预取的优化,使之更加高效。

在控制信道建立完共享内存以后,还需要在后端也建立与前端一样的虚拟队列数据结构。所需要的信息主要有:desc ring的项数(不同的前端驱动不同,比如DPDK virtio驱动和内核virtio驱动就不一样)、上次avail ring用到哪(这主要是针对重连或动态迁移的,第一次建立连接此项应为0)、虚拟队列三个ring的起始地址、设置通知信号。

这几项消息处理完以后,后端驱动利用接收到的起始地址创建了一个和前端驱动一模一样的虚拟队列数据结构,并已经准备好收发数据包。其中最后两项eventfd是用于需要通知的场景,例如:虚拟机使用内核virtio驱动,每次OVS的vhost端口往虚拟机发送数据包完成,都需要使用eventfd通知内核驱动去接收该数据包,在轮询驱动下,这些eventfd就没有意义了。

另外,VHOST_USER_GET_VRING_BASE是一个非常奇特的信号,只在虚拟机关机或者断开时会由QEMU发送给OVS进程,意味着断开数据通路。

2.数据通路处理

数据通路的实现在DPDK的lib/librte_vhost/virtio_net.c中,虽然代码看起来非常冗长,但是其中大部分都是处理各种特性以及硬件卸载功能的,主要逻辑却非常简单。

负责数据包的收发的主函数为:

uint16_t rte_vhost_enqueue_burst(int vid, uint16_t queue_id, struct rte_mbuf **pkts, uint16_t count)
//数据包流向 OVS 到 VM
uint16_t rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
    struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
//数据包流向 VM 到 OVS

具体的发送过程概括来说就是,如果OVS往VM发送数据包,对应的vhost端口去avail ring中读取可用的buffer地址,转换成VVA后,进行数据包拷贝,拷贝完成后发送eventfd通知VM;如果VM往OVS发送,则相反,从VM内的数据包缓冲区拷贝到DPDK的mbuf数据结构。以下贴一段代码注释吧,不要管里面的iommu、iova,那些都是vhost-user的新特性,可用理解为iova就是GPA。

uint16_t
rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
    struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
{
    struct virtio_net *dev;
    struct rte_mbuf *rarp_mbuf = NULL;
    struct vhost_virtqueue *vq;
    uint32_t desc_indexes[MAX_PKT_BURST];
    uint32_t used_idx;
    uint32_t i = 0;
    uint16_t free_entries;
    uint16_t avail_idx;

    dev = get_device(vid);   //根据vid获取dev实例
    if (!dev)
        return 0;

    if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->nr_vring))) {
        RTE_LOG(ERR, VHOST_DATA, "(%d) %s: invalid virtqueue idx %d.
",
            dev->vid, __func__, queue_id);
        return 0;
    }          //检查虚拟队列id是否合法

    vq = dev->virtqueue[queue_id];      //获取该虚拟队列

    if (unlikely(rte_spinlock_trylock(&vq->access_lock) == 0))  //对该虚拟队列加锁
        return 0;

    if (unlikely(vq->enabled == 0))     //如果vq不可访问,对虚拟队列解锁退出
        goto out_access_unlock;

    vq->batch_copy_nb_elems = 0;  //批处理需要拷贝的数据包数目

    if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
        vhost_user_iotlb_rd_lock(vq);

    if (unlikely(vq->access_ok == 0))
        if (unlikely(vring_translate(dev, vq) < 0))  //因为IOMMU导致的,要翻译iova_to_vva
            goto out;

    if (unlikely(dev->dequeue_zero_copy)) {   //零拷贝dequeue
        struct zcopy_mbuf *zmbuf, *next;
        int nr_updated = 0;

        for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
             zmbuf != NULL; zmbuf = next) {
            next = TAILQ_NEXT(zmbuf, next);

            if (mbuf_is_consumed(zmbuf->mbuf)) {
                used_idx = vq->last_used_idx++ & (vq->size - 1);
                update_used_ring(dev, vq, used_idx,
                         zmbuf->desc_idx);
                nr_updated += 1;

                TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
                restore_mbuf(zmbuf->mbuf);
                rte_pktmbuf_free(zmbuf->mbuf);
                put_zmbuf(zmbuf);
                vq->nr_zmbuf -= 1;
            }
        }

        update_used_idx(dev, vq, nr_updated);
    }

    /*
     * Construct a RARP broadcast packet, and inject it to the "pkts"
     * array, to looks like that guest actually send such packet.
     *
     * Check user_send_rarp() for more information.
     *
     * broadcast_rarp shares a cacheline in the virtio_net structure
     * with some fields that are accessed during enqueue and
     * rte_atomic16_cmpset() causes a write if using cmpxchg. This could
     * result in false sharing between enqueue and dequeue.
     *
     * Prevent unnecessary false sharing by reading broadcast_rarp first
     * and only performing cmpset if the read indicates it is likely to
     * be set.
     */
    //构造arp包注入到mbuf(pkt数组),看起来像是虚拟机发送的
    if (unlikely(rte_atomic16_read(&dev->broadcast_rarp) &&
            rte_atomic16_cmpset((volatile uint16_t *)
                &dev->broadcast_rarp.cnt, 1, 0))) {

        rarp_mbuf = rte_pktmbuf_alloc(mbuf_pool);    //从mempool中分配一个mbuf给arp包
        if (rarp_mbuf == NULL) {
            RTE_LOG(ERR, VHOST_DATA,
                "Failed to allocate memory for mbuf.
");
            return 0;
        }
        //构造arp报文,构造成功返回0
        if (make_rarp_packet(rarp_mbuf, &dev->mac)) {
            rte_pktmbuf_free(rarp_mbuf);
            rarp_mbuf = NULL;
        } else {
            count -= 1;
        }
    }
    //计算有多少数据包,现在的avail的索引减去上次停止时的索引值,若没有直接释放vq锁退出
    free_entries = *((volatile uint16_t *)&vq->avail->idx) -
            vq->last_avail_idx;
    if (free_entries == 0)
        goto out;

    LOG_DEBUG(VHOST_DATA, "(%d) %s
", dev->vid, __func__);

    /* Prefetch available and used ring */
    //预取前一次used和avail ring停止位置索引
    avail_idx = vq->last_avail_idx & (vq->size - 1);
    used_idx  = vq->last_used_idx  & (vq->size - 1);
    rte_prefetch0(&vq->avail->ring[avail_idx]);
    rte_prefetch0(&vq->used->ring[used_idx]);

    //此次接收过程的数据包数目,为待处理数据包总数、批处理数目最小值
    count = RTE_MIN(count, MAX_PKT_BURST);
    count = RTE_MIN(count, free_entries);
    LOG_DEBUG(VHOST_DATA, "(%d) about to dequeue %u buffers
",
            dev->vid, count);

    /* Retrieve all of the head indexes first to avoid caching issues. */
    //从avail ring中取得所有数据包在desc中的索引,存在局部变量desc_indexes数组中
    for (i = 0; i < count; i++) {
        avail_idx = (vq->last_avail_idx + i) & (vq->size - 1);
        used_idx  = (vq->last_used_idx  + i) & (vq->size - 1);
        desc_indexes[i] = vq->avail->ring[avail_idx];

        if (likely(dev->dequeue_zero_copy == 0))    //若不支持dequeue零拷贝的话,直接将索引写入used ring中
            update_used_ring(dev, vq, used_idx, desc_indexes[i]);
    }

    /* Prefetch descriptor index. */
    rte_prefetch0(&vq->desc[desc_indexes[0]]);  //从desc中预取第一个要发的数据包描述符
    for (i = 0; i < count; i++) {
        struct vring_desc *desc, *idesc = NULL;
        uint16_t sz, idx;
        uint64_t dlen;
        int err;

        if (likely(i + 1 < count))
            rte_prefetch0(&vq->desc[desc_indexes[i + 1]]); //预取后一项

        if (vq->desc[desc_indexes[i]].flags & VRING_DESC_F_INDIRECT) {  //如果该项支持indirect desc,按照indirect处理
            dlen = vq->desc[desc_indexes[i]].len;
            desc = (struct vring_desc *)(uintptr_t)     //地址转换成vva
                vhost_iova_to_vva(dev, vq,
                        vq->desc[desc_indexes[i]].addr,
                        &dlen,
                        VHOST_ACCESS_RO);
            if (unlikely(!desc))
                break;

            if (unlikely(dlen < vq->desc[desc_indexes[i]].len)) {
                /*
                 * The indirect desc table is not contiguous
                 * in process VA space, we have to copy it.
                 */
                idesc = alloc_copy_ind_table(dev, vq,
                        &vq->desc[desc_indexes[i]]);
                if (unlikely(!idesc))
                    break;

                desc = idesc;
            }

            rte_prefetch0(desc);   //预取数据包
            sz = vq->desc[desc_indexes[i]].len / sizeof(*desc);
            idx = 0;
        } 
        else {
            desc = vq->desc;    //desc数组
            sz = vq->size;      //size,个数
            idx = desc_indexes[i];  //desc索引项
        }

        pkts[i] = rte_pktmbuf_alloc(mbuf_pool);   //给正在处理的这个数据包分配mbuf
        if (unlikely(pkts[i] == NULL)) {    //分配mbuf失败,跳出包处理
            RTE_LOG(ERR, VHOST_DATA,
                "Failed to allocate memory for mbuf.
");
            free_ind_table(idesc);
            break;
        }

        err = copy_desc_to_mbuf(dev, vq, desc, sz, pkts[i], idx,
                    mbuf_pool);
        if (unlikely(err)) {
            rte_pktmbuf_free(pkts[i]);
            free_ind_table(idesc);
            break;
        }

        if (unlikely(dev->dequeue_zero_copy)) {
            struct zcopy_mbuf *zmbuf;

            zmbuf = get_zmbuf(vq);
            if (!zmbuf) {
                rte_pktmbuf_free(pkts[i]);
                free_ind_table(idesc);
                break;
            }
            zmbuf->mbuf = pkts[i];
            zmbuf->desc_idx = desc_indexes[i];

            /*
             * Pin lock the mbuf; we will check later to see
             * whether the mbuf is freed (when we are the last
             * user) or not. If that's the case, we then could
             * update the used ring safely.
             */
            rte_mbuf_refcnt_update(pkts[i], 1);

            vq->nr_zmbuf += 1;
            TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);
        }

        if (unlikely(!!idesc))
            free_ind_table(idesc);
    }
    vq->last_avail_idx += i;

    if (likely(dev->dequeue_zero_copy == 0)) {  //实际此次批处理的所有数据包内容拷贝
        do_data_copy_dequeue(vq);
        vq->last_used_idx += i;
        update_used_idx(dev, vq, i);  //更新used ring的当前索引,并eventfd通知虚拟机接收完成
    }

out:
    if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
        vhost_user_iotlb_rd_unlock(vq);

out_access_unlock:
    rte_spinlock_unlock(&vq->access_lock);

    if (unlikely(rarp_mbuf != NULL)) {  //再次检查有arp报文需要发送的话,就加入到pkts数组首位,虚拟交换机的mac学习表就能第一时间更新
        /*
         * Inject it to the head of "pkts" array, so that switch's mac
         * learning table will get updated first.
         */
        memmove(&pkts[1], pkts, i * sizeof(struct rte_mbuf *));
        pkts[0] = rarp_mbuf;
        i += 1;
    }

    return i;
}

在软件实现的网络功能中大量使用批处理,而且一批的数目是可以自己设置的,但是一般约定俗成批处理最多32个数据包。


 

参考:https://www.jianshu.com/p/ae54cb57e608

  for (i = 0; i < nb_rxq; i++) {                vq = dev->data->rx_queues[i];                if (!vq) {                        VHOST_LOG(INFO, "rxq-%d not setup yet, skip! ", i);                        continue;                }
                ret = rte_vhost_get_vhost_vring(vq->vid, (i << 1) + 1, &vring);                if (ret < 0) {                        VHOST_LOG(INFO,                                "Failed to get rxq-%d's vring, skip! ", i);                        continue;                }
                if (vring.kickfd < 0) {                        VHOST_LOG(INFO,                                "rxq-%d's kickfd is invalid, skip! ", i);                        continue;                }                dev->intr_handle->intr_vec[i] = RTE_INTR_VEC_RXTX_OFFSET + i;                dev->intr_handle->efds[i] = vring.kickfd;                count++;                VHOST_LOG(INFO, "Installed intr vec for rxq-%d ", i);        }

原文地址:https://www.cnblogs.com/dream397/p/14155582.html