《Monitoring and Tuning the Linux Networking Stack: Receiving Data》翻译

Overview

从宏观的角度来看,一个packet从网卡到socket接收缓冲区的路径如下所示:

  1. 驱动加载并初始化
  2. packet到达网卡
  3. packet通过DMA被拷贝到内核中的一个ring buffer
  4. 产生一个硬件中断,让系统知道已经有个packet到达内存
  5. 驱动会调用NAPI启动一个poll loop,如果它还没启动的话
  6. 系统的每个CPU上都有一个ksoftirqd进程,它们都是在系统启动的时候就已经注册了的。ksoftirqd进程会调用NAPI的poll函数从ring buffer中将packet取出,而poll函数是设备驱动程序在初始化的时候注册的。
  7. 那些已经写入数据的ring buffer的内存区域会被unmapped
  8. 那些通过DMA写入内存的数据会以"skb"的形式传递给网络层进行进一步的处理
  9. 如果packet steering功能开启或者网卡有多个receive queue,则接收到的packet会被分发到多个CPU上
  10. 队列中的数据会被传递到protocol layer
  11. protocol layer会对数据进行处理
  12. 数据最终会通过protocl layers加入所属socket的receive buffer

整个流程会在下文的各个章节中进行详细的描述,而下文中的protocol layer会以IP和UDP作为例子,但是其中的很多内容,对于其他protocol layer都是通用的。

Detailed Look

本文将会以igb驱动程序作为例子,并用它来控制一个比较常见的服务器网卡Intel I350。因此,我们首先来看看igb设备驱动程序是怎么工作的。

Network Device Driver

Initialization

驱动程序会利用module_init宏注册一个初始化函数,当内核加载驱动程序时,该函数就会被调用。igb初始化函数(igb_init_module)和它利用module_init进行注册的代码如下:

/**
 *  igb_init_module - Driver Registration Routine
 *
 *  igb_init_module is the first routine called when the driver is
 *  loaded. All it does is register with the PCI subsystem.
 **/
static int __init igb_init_module(void)
{
  int ret;
  pr_info("%s - version %s
", igb_driver_string, igb_driver_version);
  pr_info("%s
", igb_copyright);

  /* ... */

  ret = pci_register_driver(&igb_driver);
  return ret;
}

module_init(igb_init_module);

  

其中初始化设备的大部分工作都是由pci_register_driver来完成的。我们将在下面详细介绍。

PCI initialization

Intel I350是一个PCI express设备。PCI设备通过PCI Configuration Space中的一些寄存器标识自己。

当一个设备驱动程序被编译时,会用一个叫做MODULE_DEVICE_TABLE的宏来创建一个table,用该table来包含该设备驱动程序可以控制的PCI设备的设备ID。接着这个table也会被注册为一个结构的一部分,我们在下面马上就能看到。

而内核最终将会使用这个table来决定该加载哪个驱动程序来控制该设备。

操作系统就是这样确定哪个设备和系统连接了,以及该使用哪个驱动来和该设备进行交互。

static DEFINE_PCI_DEVICE_TABLE(igb_pci_tbl) = {
  { PCI_VDEVICE(INTEL, E1000_DEV_ID_I354_BACKPLANE_1GBPS) },
  { PCI_VDEVICE(INTEL, E1000_DEV_ID_I354_SGMII) },
  { PCI_VDEVICE(INTEL, E1000_DEV_ID_I354_BACKPLANE_2_5GBPS) },
  { PCI_VDEVICE(INTEL, E1000_DEV_ID_I211_COPPER), board_82575 },
  { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_COPPER), board_82575 },
  { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_FIBER), board_82575 },
  { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_SERDES), board_82575 },
  { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_SGMII), board_82575 },
  { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_COPPER_FLASHLESS), board_82575 },
  { PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_SERDES_FLASHLESS), board_82575 },

  /* ... */
};
MODULE_DEVICE_TABLE(pci, igb_pci_tbl);

  

从上文已知,在驱动的初始化函数中会调用pci_register_driver函数。

该函数会注册一个满是指针的结构,其中大多数的指针都是函数指针,不过包含PCI device ID的table同样会被注册。内核会利用这些驱动注册的函数来启动PCI设备。

static struct pci_driver igb_driver = {
  .name     = igb_driver_name,
  .id_table = igb_pci_tbl,
  .probe    = igb_probe,
  .remove   = igb_remove,

  /* ... */
};

  

PCI probe

一旦一个设备通过它的PCI ID被识别,内核就会选择合适的驱动程序来控制该设备。每一个PCI设备驱动程序都在内核的PCI子系统中注册了一个probe function。对于还没有驱动控制的设备,内核会调用该函数,直到和某个驱动程序相匹配。大多数驱动程序都有大量的代码用来控制设备。具体的操作各个驱动也有所不同。但是一些典型的操作如下所示:

  1. 启动PCI设备
  2. 申请内存和IO端口
  3. 设置DMA mask
  4. 注册设备驱动程序支持的ethtool function
  5. 有必要的话,启动watch dog task(比如,e1000 有一个watchdog task来确认硬件是否挂起)
  6. 处理一些该设备特有的问题
  7. 创建,初始化和注册一个struct net_device_ops结构,该结构包含一系列的函数指针,指向各种例如打开设备,发送数据,设置mac地址以及其他一些功能
  8. 创建,初始化和注册一个struct net_device用来代表一个网络设备

让我们来快速浏览一下,igd里对应的igb_probe是如何完成上述操作的

A peek into PCI initialization

接下来的这些代码取自igb_probe函数,主要用于一些基本的PCI配置

err = pci_enable_device_mem(pdev);

/* ... */

err = dma_set_mask_and_coherent(&pdev->dev, DMA_BIT_MASK(64));

/* ... */

err = pci_request_selected_regions(pdev, pci_select_bars(pdev,
           IORESOURCE_MEM),
           igb_driver_name);

pci_enable_pcie_error_reporting(pdev);

pci_set_master(pdev);
pci_save_state(pdev);

  

 首先,设备会由pci_enable_device_mem初始化,如果该设备处于暂停状态就会被唤醒,获取内存资源以及其他一些工作。接着会对DMA mask进行设置,因为该设备会读写64位的内存地址,因此dma_set_mask_and_coherent的参数为DMA_BIT_MASK(64)。然后调用pci_request_selected_regions获取内存,同时使能PCI Express Advanced Error Reporting功能,最终调用pci_set_master使能DMA并且调用pci_save_state保存PCI configuration space。

Network device initialization

igb_probe函数做了大量关于网络设备初始化的工作。除了一些针对PCI的工作以外,它还需要做如下这些工作:

  1. 注册struct net_device_ops结构
  2. 注册ethtool的相关操作
  3. 从网卡中获取默认的mac地址
  4. 设置net_device中的feature flags
  5. 以及其他一些工作

下面我们对上述的每一部分进行详细的分析。

struct net_device_ops

struct net_device_ops中包含许多函数指针指向一些网络子系统用来操作设备的重要功能。我们将在接下来的内容中多次提及此结构。在igb_probe中net_device_ops结构将会和struct net_device绑定,代码如下:

static int igb_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{
  /* ... */

  netdev->netdev_ops = &igb_netdev_ops;

  

而net_device_ops结构中的各个指针指向的函数也定义在同一个文件中:

static const struct net_device_ops igb_netdev_ops = {
  .ndo_open               = igb_open,
  .ndo_stop               = igb_close,
  .ndo_start_xmit         = igb_xmit_frame,
  .ndo_get_stats64        = igb_get_stats64,
  .ndo_set_rx_mode        = igb_set_rx_mode,
  .ndo_set_mac_address    = igb_set_mac,
  .ndo_change_mtu         = igb_change_mtu,
  .ndo_do_ioctl           = igb_ioctl,

  /* ... */

  

我们可以看到,这个结构中包含很多有趣的字段,例如ndo_open,ndo_stop,ndo_start_xmit和ndo_get_stats64,他们都包含了igb驱动实现的对应函数的地址。我们下面会对其中的某些内容做进一步的分析。

ethtool registration

ethtool是一个命令行工具,用来获取和设置各种驱动和硬件相关的选项。通常会利用ethtool来从网络设备收集一些详细的数据。

ethtool通过ioctl系统调用和设备驱动程序交互。设备驱动程序注册了一系列的函数用于ethtool的操作。当ethtool发出一个ioctl调用时,内核会找到对应驱动的ethtool结构并且执行相应的注册函数。驱动的ethtool函数可以做许多事情,包括修改驱动中一个简单的falg,乃至通过写设备的寄存器来调整真实设备。

igb驱动通过在igb_probe中调用igb_set_ethtool_ops来注册ethtool的各个操作。

static int igb_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{
  /* ... */

  igb_set_ethtool_ops(netdev);

  

void igb_set_ethtool_ops(struct net_device *netdev)
{
  SET_ETHTOOL_OPS(netdev, &igb_ethtool_ops);
}

  

static const struct ethtool_ops igb_ethtool_ops = {
  .get_settings           = igb_get_settings,
  .set_settings           = igb_set_settings,
  .get_drvinfo            = igb_get_drvinfo,
  .get_regs_len           = igb_get_regs_len,
  .get_regs               = igb_get_regs,
  /* ... */

  

每个驱动都能自己决定哪些ethtool函数和自己有关并且决定实现其中的哪些。并不是每个驱动都需要实现所有的ethtool函数。其中一个比较有趣的ethtool函数是get_ethtool_stats,它会创建一些非常详细的计数器进行追踪,它们要么位于驱动中,要么位于设备内。

IRQs

当一个数据帧通过DMA被写入RAM时,网卡是如何通知系统的其余部分,已经有数据可以处理了呢?

一般网卡会产生一个interrupt request(IRQ)表示有数据到了。有以下三种IRQ类型:MSI-X,MSI和legacy IRQ。但是如果有大量的数据帧到达时,就会导致产生大量的IRQ。而产生的IRQ越多,那么用于high level task,例如用户进程的CPU时间就越少。

于是创建了New API(NAPI)这种机制,用于减少数据包的到来导致设备产生中断的数目。尽管NAPI可以减少IRQ的数目,但是并不能完全避免。下面的章节会告诉我们原因。

NAPI

NAPI在许多方面和获取数据传统的方式不同。NAPI允许设备驱动程序注册一个poll函数,NAPI子系统会调用它来获取数据帧。

NAPI一般的使用方式如下:

  1. NAPI由驱动使能,但是开始仍处于关闭状态
  2. 一个packet到达并由网卡通过DMA到内存
  3. 网卡产生一个IRQ,从而触发了驱动中的IRQ handler
  4. 驱动利用一个softirq唤醒NAPI子系统,它会在另一个线程中调用驱动注册的poll函数来获取packet
  5. 驱动接着会屏蔽网卡发出的所有IRQ,因为这能让NAPI子系统处理packet并且不受来自设备的中断的影响
  6. 一旦没有更多的工作需要做了,NAPI子系统会被关闭,而来自设备的IRQ又会被开启
  7. 跳到步骤2

上述这种收集数据的方式和传统方式相比能够有效减少overhead,因为一次能处理很多数据,而不需要每个数据帧产生一次IRQ。设备驱动程序实现了poll函数并通过调用netif_napi_add将它注册到NAPI中。当通过netif_napi_add向NAPI注册poll时,驱动同时会声明一个weight,大多数驱动都会将它固定为64。该值的意义将会在下文讨论。

通常,驱动程序会在初始化的时候注册他们的NAPI poll函数。

NAPI initialization in the igb driver

igb驱动通过如下一个长长的调用链来实现NAPI的初始化:

  1. igb_probe调用igb_sw_init
  2. igb_sw_init调用igb_init_interrupt_scheme
  3. igb_init_interrupt_scheme调用igb_alloc_q_vectors
  4. igb_alloc_q_vectors调用igb_alloc_q_vector
  5. igb_alloc_q_vector调用netif_napi_add

这个调用链会导致一些上层的事发生:

  1. 如果支持MSI-X,它会通过调用pci_enable_msix使能
  2. 许多设置被初始化:尤其是设备和驱动用来发送接收包的发送和接收队列的数目
  3. 每次创建一个传输或者接收队列都会调用一次igb_alloc_q_vector
  4. 每次调用igb_alloc_q_vector都会调用netif_napi_add为该队列注册一个poll函数,并且每次调用poll函数接收数据时都会传递给它一个struct napi_struct的实例。

让我们来看一看igb_alloc_q_vector是如何注册poll回调函数以及它的私有数据的

static int igb_alloc_q_vector(struct igb_adapter *adapter,
                              int v_count, int v_idx,
                              int txr_count, int txr_idx,
                              int rxr_count, int rxr_idx)
{
  /* ... */

  /* allocate q_vector and rings */
  q_vector = kzalloc(size, GFP_KERNEL);
  if (!q_vector)
          return -ENOMEM;

  /* initialize NAPI */
  netif_napi_add(adapter->netdev, &q_vector->napi, igb_poll, 64);

  /* ... */

  

上述代码为receive queue分配了内存,并且向NAPI子系统注册了igb_poll函数。其中的参数包含了新创建的receive queue相关的struct napi_struct的引用(&q_vector->napi)。当需要从receive queue中接收数据包时,NAPI子系统会将它传输给igb_poll函数。这对于我们以后研究数据流从驱动发送网络栈的过程是非常重要的。

Bring a network device up

回忆一下之前说过的net_device_ops结构,它注册了一系列函数用于启动设备,传输包,设置mac地址等等。当一个网络设备被启动时(比如,调用ifconfig eth0 up) ,和net_device_ops结构中的ndo_open域相关的函数就会被调用。

ndo_open函数会做如下操作:

  1. 获取receive queue和send queue的内存
  2. 使能NAPI
  3. 注册interrupt handler
  4. 使能hardware interrupts
  5. 以及其他一些工作

在igb驱动中,和net_device_ops结构中的ndo_open相关的函数为igb_open。

Preparing to receive data from the network

现在大多数的网卡都利用DMA直接将数据写入RAM,从而让操作系统能直接获取数据进行处理。许多网卡为此使用的数据结构类似于创建在环形缓冲区上的队列。为了实现DMA,设备驱动程序必须和操作系统合作,保留一些内存可供网卡使用。一旦区域确定,网卡会得到关于通知,并且会将收到的数据都写入其中。之后这些数据会被取出并交由网络子系统处理。

这些都非常简单,但是如果数据包到达的过快,单个CPU不能很好地处理所有的数据包怎么办?因为该数据结构是基于一个固定大小的内存区域,因此接收到的包将会被丢弃。这个时候,像Receive Side Scaling(RSS)或multiqueue就能派上用场了。有的设备有能力同时将包写入不同的RAM,每个区域都是一个单独的队列。这就允许操作系统在硬件层面使用多个CPU并行处理获取的数据。但是这个特性并不是被所有网卡支持的。不过Intel I350是支持multiple queue的。我们可以看到在igb驱动中,它在启动时就是调用一个叫igb_setup_all_rx_resources的函数。而它又会调用另一个函数,igb_setup_rx_resources,用于让receive queue处理DMA内存。事实上,receive queue的数目和长度可以通过ethtool进行调整。对该这些值进行调整,我们可以看到对于已处理包的数目和已丢弃包数目比例的影响。

网卡一般使用基于packet header field(源地址,目的地址,端口)的哈希函数来确定某个包该发往哪个receive queue。有的网卡还允许你调整某些receive queue的权重,从而让某些特定的队列处理更多的流量。还有的网卡甚至允许你调整哈希函数。这样的话,你可以将特定的数据流发往特定的receive queue进行处理,甚至在硬件层面就将包丢弃。接下来我们很快会看到如何对这些设置进行调整。

Enable NAPI

当打开一个设备时,驱动通常会使能NAPI。之前我们看到了驱动是如何注册NAPI的poll函数的,但是直到打开设备前,NAPI都不是使能的。使能NAPI其实非常简单直接,调用napi_enable翻转struct napi_struct 中的一个位就表示NAPI使能了。如上所述,尽管NAPI使能了,但是它仍然可能处于关闭状态。在igb driver中,每个q_vector的NAPI都会在设备打开或者利用ethtool改变队列的数目或大小时使能。

for (i = 0; i < adapter->num_q_vectors; i++)
  napi_enable(&(adapter->q_vector[i]->napi));

  

Register an interrupt handler

使能了NAPI之后,下一步就是注册一个interrupt handler。现在有好几种方式用于发生一个中断:MSI-X,MSI以及legacy interrupts。因此,这一部分的代码对于不同的驱动都是不同的,这取决于特定的硬件支持哪种中断方式。驱动必须确定设备支持哪种中断方式,并且注册合适的处理方法,从而能在中断发生时进行处理。有些驱动,例如igb会为每种方法注册一个interrupt handler,一种方法失败就换另一种。对于支持multiple receive queue的网卡来说,MSI-X是更好的方法。这样的话,每个receive queue都有自己的hardware interrupt,从而能被特定的CPU处理(通过irqbalance或修改/proc/irq/IRQ_NUMBER/smp_affinity)。我们很快就能看到,处理中断的CPU也将是对包进行处理的CPU。这样一来,收到的包就能从hardware interrupt开始直到整个网络栈都由不同的CPU处理。

如果MSI-X不能用,MSI仍然要优于legacy interrupts。在igb驱动中,函数igb_msix_ring,igb_intr_msi和igb_intr分别是MSI-X,MSI和legacy interrupt对应的interrupt handler。

static int igb_request_irq(struct igb_adapter *adapter)
{
  struct net_device *netdev = adapter->netdev;
  struct pci_dev *pdev = adapter->pdev;
  int err = 0;

  if (adapter->msix_entries) {
    err = igb_request_msix(adapter);
    if (!err)
      goto request_done;
    /* fall back to MSI */

    /* ... */
  }

  /* ... */

  if (adapter->flags & IGB_FLAG_HAS_MSI) {
    err = request_irq(pdev->irq, igb_intr_msi, 0,
          netdev->name, adapter);
    if (!err)
      goto request_done;

    /* fall back to legacy interrupts */

    /* ... */
  }

  err = request_irq(pdev->irq, igb_intr, IRQF_SHARED,
        netdev->name, adapter);

  if (err)
    dev_err(&pdev->dev, "Error %d getting interrupt
", err);

request_done:
  return err;
}

  

从上面的代码我们可以看到,驱动会首先尝试利用igb_request_msix设置MSI-X的interrupt handler,如果失败的话,进入MSI。request_irq用于注册MSI的interrupt handler,igb_intr_mis。如果这也失败了,则会进入legacy interrupts。这个时候会再次使用request_irq注册legacy interrupt的interrupt handler,igb_intr。igb的驱动就是这样注册一个函数用于处理,当网卡发出中断说明有数据到达并已经准备好接受处理了。

Enable Interrupts

到现在为止,基本上所有事情都设置完毕了。唯一剩下的就是打开中断并且等待数据的到来。打开中断对于每个设备都是不一样的,对于igb驱动,它是在__igb_open中通过调用igb_irq_enable完成的。一般,打开中断都是通过写设备的寄存器完成的:

static void igb_irq_enable(struct igb_adapter *adapter)
{

  /* ... */

    wr32(E1000_IMS, IMS_ENABLE_MASK | E1000_IMS_DRSTA);
    wr32(E1000_IAM, IMS_ENABLE_MASK | E1000_IMS_DRSTA);

  /* ... */
}

  

The network device is now up

驱动可能还需要做另外一些事,例如启动定时器,work queue,或者其他硬件相关的设置。一旦这些都完成了,那么设备就已经启动并准备好投入使用了。

SoftIRQs

在深入网络栈之前,我们先要了解一下Linux内核中一个叫做SoftIRQ的东西

What is a softirq

Linux内核中的softirq system是一种能够让代码到interrupt handler上下文之外执行的一种机制。它非常重要,因为在几乎所有的interrupt handler的执行过程中,hardware interrupts都是关闭的。而中断关闭的时间越长,那么就越有可能丢失某些event。因此我们可以把一些执行时间较长的代码放到interrupt handler之外执行,这样就能让它快点完成从而恢复中断。在内核中,还有其他的机制能够延迟代码的执行,但是对于网络栈来说,我们选择softirqs。

softirq system可以被看成是一系列的kernel thread(每个CPU一个),它们会对不同的softirq event运行不同的处理函数。如果你观察过top命令的输出,并且在一系列的kernel threads中看到了一个ksoftirqd/0,那么它就是运行在CPU 0上的一个softirq kernel thread。

内核子系统可以通过运行open_softirq函数来注册一个softirq handler。我们下面将看到的是网络子系统如何注册它的softirq handlers。现在,我们先来学习一下softirq是如何工作的。

ksoftirqd

因为softirq对于推迟设备驱动工作的执行太过重要了,你可以想象,它一定在内核整个生命周期中很早的时候就开始执行了。下面我们来看看ksoftirqd系统是如何初始化的:

static struct smp_hotplug_thread softirq_threads = {
  .store              = &ksoftirqd,
  .thread_should_run  = ksoftirqd_should_run,
  .thread_fn          = run_ksoftirqd,
  .thread_comm        = "ksoftirqd/%u",
};

static __init int spawn_ksoftirqd(void)
{
  register_cpu_notifier(&cpu_nfb);

  BUG_ON(smpboot_register_percpu_thread(&softirq_threads));

  return 0;
}
early_initcall(spawn_ksoftirqd);

  

你可以看到上面struct smp_hotplug_thread的定义,其中注册了两个函数指针:ksoftirqd_should_run和run_softirqd。这两个函数都会在kernel/smpboot.c中被调用,用来构成一个event loop。kernel/smpboot.c中的代码首先会调用ksoftirqd_should_run来确定是否还有pending softirq,如果有的话,就执行run_softirqd。run_ksoftirqd会在调用__do_softirq之前执行一些minor bookkeeping。

__do_softirq

__do_softirq函数主要做以下这些事:

  • 确定哪些softirq被挂起了
  • 记录时间
  • 更新softirq执行次数
  • 被挂起的softirq的softirq handler(在open_softirq被注册)被执行

因此,现在你看CPU的使用图,其中的softirq或si代表的就是用于这些deferred work所需的时间。

Linux network device subsysem

既然我们已经大概了解了网卡驱动和softirq是如何工作的,接下来我们来看看Linux network device subsystem是如何初始化的。接着我们将追踪一个包从它到达网卡之后所走过的整条路径。

Initialization of network device system

network device (netdev) subsystem是在函数net_dev_init中初始化的。有许多有趣的事情在这个初始化函数中发生。

Initialization of struct softnet_data structures

net_dev_init会为每个CPU都创建一个struct softnet_data。这个结构会包含很多指针用于处理网络数据:

  • 一系列注册到该CPU的NAPI结构
  • 一个backlog用于数据处理
  • processing weight
  • 一个receive offload 结构的列表
  • 对于Receive packet steering的设置
  • 以及其他

其中的每一部分我们都会在下文中详细叙述。

Initialization of softirq handlers

net_dev_init注册了一个receive softirq handler和transmit softirq handler分布用于处理输入和输出的数据。代码如下:

static int __init net_dev_init(void)
{
  /* ... */

  open_softirq(NET_TX_SOFTIRQ, net_tx_action);
  open_softirq(NET_RX_SOFTIRQ, net_rx_action);

 /* ... */
}

  

我们很快就能看到驱动的interrupt handler是如何触发NET_RX_SOFTIRQ的net_rx_action函数的

Data arrives

终于,数据来了!

假设receive queue有足够的descriptors,packet会直接通过DMA写入RAM。之后设备就会产生一个相应的中断(或者在MSI-X中,是packet到达的receive queue对应的中断)

Interrupt handler

一般来说,当一个中断对应的interrupt handler运行时,它应该将尽量多的工作都放到中断上下文之外进行。这非常重要,因为在一个中断执行的过程中,其他中断都阻塞了。让我们来看看MSI-X interrupt handler的源码,它能很好地解释,为什么interrupt handler应该尽可能地少做工作。

static irqreturn_t igb_msix_ring(int irq, void *data)
{
  struct igb_q_vector *q_vector = data;

  /* Write the ITR value calculated from the previous interrupt. */
  igb_write_itr(q_vector);

  napi_schedule(&q_vector->napi);

  return IRQ_HANDLED;
}

  

这个interrupt handler非常短,在返回之前仅仅做了两个很快的操作。首先,它调用了igb_write_itr,更新了一下硬件相关的寄存器。在这个例子中,被更新的寄存器是用于追踪hardware interrupt到达速率的。这个寄存器通常和一个叫"Interrupt Throttling"(或者叫"Interrupt Coalescing")的硬件特性相结合,它用来调整中断发往CPU的速率。我们很快可以看到ethtool提供了一种机制,能够调节IRQ发生的速率。

接着调用napi_schedule用来唤醒NAPI processing loop(如果它不在运行的话)。注意的是NAPI processing loop是在softirq运行的,而不是在interrupt handler中。interrupt handler只是简单地让它开始执行,如果它没有准备好的话。

真正的代码会展现这些工作会是多么重要,它会帮助我们理解网络数据是如何在多CPU系统中处理的。

NAPI and napi_schedule

让我们来看看hardware interrupt handler中调用的napi_schedule是如何工作的。

要记住,NAPI存在的目的就是在不需要网卡发送中断,表示已经有数据可以准备处理的情况下也能接收数据。如上文所述,NAPI的poll loop会在收到一个hardware interrupt后生成。换句话说:NAPI是使能的,但是处于关闭状态,直到网卡产生一个IRQ表示第一个packet到达,NAPI才算打开。当然还有其他一些情况,我们很快就能看到,NAPI会被关闭,直到一个hardware interrupt让它重新开启。

NAPI poll loop会在驱动的interrupt handler调用napi_schedule后启动。不过napi_schedule只是一个包装函数,它直接调用了__napi_schedule

/**
 * __napi_schedule - schedule for receive
 * @n: entry to schedule
 *
 * The entry's receive function will be scheduled to run
 */
void __napi_schedule(struct napi_struct *n)
{
  unsigned long flags;

  local_irq_save(flags);
  ____napi_schedule(&__get_cpu_var(softnet_data), n);
  local_irq_restore(flags);
}
EXPORT_SYMBOL(__napi_schedule);

  

该代码调用__get_cpu_var获取当前运行的CPU的softnet_data结构。接着softnet_data结构和struct napi_struct结构会被传输给__napi_schedule。

/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
                                     struct napi_struct *napi)
{
  list_add_tail(&napi->poll_list, &sd->poll_list);
  __raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

  

上面的代码主要做了两件事:

  1. 从驱动程序的interrupt handler中获取的struct napi_struct会被添加到当前CPU的softnet_data结构的poll_list中
  2. __raise_softirq_irqoff用于触发一个NET_RX_SOFTIRQ softirq。这会导致在network device subsystem初始化的时候注册的net_rx_action被执行,如果它当前没有在执行的话

我们很快就能看到,softirq的处理函数net_rx_action会调用NAPI的poll函数用于获取数据

A note about CPU and network data processing

需要注意的是到目前为止我们见到的所有把任务从hardware interrupt handler推迟到softirq的代码使用的结构都是和当前CPU相关的。尽管驱动的IRQ handler只做很少的工作,但是softirq handler会和驱动的IRQ handler在同一个CPU上执行。

这就是为什么IRQ会由哪个CPU处理很重要了,因为该CPU不仅会用于执行驱动的interrupt handler,还会通过对应的NAPI在softirq中获取数据。

我们接下去将会看到,像Receive Packet Steering这样的机制会将其中的一些工作分发到其他CPU上去。

Network data processing begins

一旦softirq的代码知道了是哪个softirq被挂起了,它就会开始执行,并且调用net_rx_action,这个时候网络数据的处理就开始了。让我们来看看net_rx_action的processing loop的各个部分,了解一下它是如何工作的。

net_rx_action processing loop

net_rx_action从被DAM写入的packet所在的内存开始处理。该函数会遍历在当前CPU上排队的NAPI结构,依次取下每个结构并进行处理。processing loop指定了NAPI的poll函数所能进行的工作量以及消耗的工作时间。它通过如下两种方式实现:

  1. 通过追踪budget(可以调整)
  2. 检查经过的时间
while (!list_empty(&sd->poll_list)) {
    struct napi_struct *n;
    int work, weight;

    /* If softirq window is exhausted then punt.
     * Allow this to run for 2 jiffies since which will allow
     * an average latency of 1.5/HZ.
     */
    if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit)))
      goto softnet_break;

  

这就是内核如何防止packet processing一直独占整个CPU。其中的budget是该CPU上每个NAPI结构的预算的总和。这就是为什么multiqueue网卡要小心地调节IRQ affinity的原因。我们直到,处理从设备发出的IRQ的CPU也会被用来处理对应的softirq handler,因此也会成为处理上述循环和budget computation的CPU。

有着multiqueue网卡的系统可能会出现这种情况,多个NAPI结构被注册到了同一个CPU上。所有的NAPI结构的处理都会消耗同一个CPU的budget。

如果你没有足够的CPU去分发网卡的IRQ,你可以考虑增加net_rx_action的budget从而允许每个CPU能处理更多的packet。增加budget会增加CPU的使用率,但是可以减小延时,因为数据处理地更及时(但是CPU的处理时间仍然是2 jiffies,不管budget是多少)。

NAPI poll function and weight

我们已经知道网卡驱动调用netif_napi_add注册poll函数。在上文中我们已经看到,igb驱动中有如下这段代码:

/* initialize NAPI */
  netif_napi_add(adapter->netdev, &q_vector->napi, igb_poll, 64);

  

它给NAPI结构的weight赋值为64。我们现在就会看到它是如何在net_rx_action processing loop中使用的。

weight = n->weight;

work = 0;
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
        work = n->poll(n, weight);
        trace_napi_poll(n);
}

WARN_ON_ONCE(work > weight);

budget -= work;

  

首先从NAPI结构中获取weight(此处是64),然后将它传递给同样注册到NAPI结构中的poll函数(此处为igb_poll)。poll函数会返回已经被处理的帧数,并保存在work中,之后它将从budget中减去。因此,假设:

  1. 你的驱动使用的weight是64
  2. 你的budget设置的是300

你的系统将会在如下任意一种情况发生时,停止处理数据;

  1. igb_poll函数最多被调用5次(如果没有数据处理还会更少)
  2. 消耗了2 jiffies的时间

The NAPI / network device driver contract

NAPI子系统和设备驱动的交互中还未提及的一部分就是关闭NAPI的条件,包含的内容如下:

  •  如果驱动的poll函数消耗完了它的weight,它一定不能改变NAPI的状态。net_rx_action的循环会继续进行
  • 如果驱动的poll函数没有消耗完它所有的weight,它必须关闭NAPI。NAPI会在下次收到IRQ的时候重新启动并且驱动的IRQ handler会调用napi_schedule函数

我们先来看看net_rx_action如何处理第一种情况

Finishing the net_rx_action loop

/* Drivers must not modify the NAPI state if they
 * consume the entire weight.  In such cases this code
 * still "owns" the NAPI instance and therefore can
 * move the instance around on the list at-will.
 */
if (unlikely(work == weight)) {
  if (unlikely(napi_disable_pending(n))) {
    local_irq_enable();
    napi_complete(n);
    local_irq_disable();
  } else {
    if (n->gro_list) {
      /* flush too old packets
       * If HZ < 1000, flush all packets.
       */
      local_irq_enable();
      napi_gro_flush(n, HZ >= 1000);
      local_irq_disable();
    }
    list_move_tail(&n->poll_list, &sd->poll_list);
  }
}

  

如果所有的work都被消耗完了,net_rx_action需要处理以下两种情况:

  1. 网络设备需要被关闭(因为用户运行了ifconfig eth0 down)
  2. 如果设备没有被关闭,检查是否存在generic receive offload(GRO)list。如果time tick rate 大于1000,所有最近更新的GRO'd network flow都会被清除。下面我们会详细介绍GRO。将NAPI结构移到列表的尾端,并迭代至下一个NAPI运行

这就是packet processing loop如何调用驱动注册的poll函数来处理数据。我们很快将会看到,poll函数将会获取数据并将它传递到协议栈进行处理。

Exiting the loop when limits are reached

当下列情况发生时,net_rx_action的循环将会退出:

  1. 该CPU的poll list已经没有更多的NAPI结构了(!list_empty(&sd->poll_list))
  2. 剩下的budget小于等于0
  3. 2 jiffies的时间限制到了
/* If softirq window is exhausted then punt.
 * Allow this to run for 2 jiffies since which will allow
 * an average latency of 1.5/HZ.
 */
if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit)))
  goto softnet_break;

  

softnet_break:
  sd->time_squeeze++;
  __raise_softirq_irqoff(NET_RX_SOFTIRQ);
  goto out;

  

struct softnet_data结构中的某些统计数据增加了并且softirq的NET_RX_SOFTIRQ被关闭了。其中的time_squeeze域是用来测量这样一个数据:net_rx_action还有很多工作要做,但是要么因为budget用完了,要么超时了,此类情况发生的次数。这些统计数据对于了解网络的瓶颈是非常有用的。NET_RX_SOFTIRQ被关闭从而能给其他任务腾出时间。这一小段代码的意义是,尽管还有很多工作要做,但是我们不想再独占CPU了,

执行流接着被传递给了out。当没有更多的NAPI结构需要处理,换句话说,budget比network activity更多,所有的驱动都已经关闭了NAPI,net_rx_action无事可做的时候,也会运行到out。

out段代码在从net_rx_action返回之前做了一件重要的事情:调用net_rps_action_and_irq_enable。它在Receive Packet Steering使能的情况下有着重要的作用;它会唤醒远程的CPU用于处理网络数据。

我们将在之后更多地了解RPS是如何工作的。现在让我们先走进NAPI poll函数的内部,这样我们就能向上进入网络栈了。

NAPI poll

我们已经知道设备驱动程序申请了一块内存用于让设备DMA到达packet。驱动有责任申请这些区域,同样也有责任unmap those regions,获取其中的数据并且将它发往网络栈。让我们通过观察igb driver是如何完成这些工作的,从而了解在实际过程中这些步骤是如何完成的。

igb_poll

/**
 *  igb_poll - NAPI Rx polling callback
 *  @napi: napi polling structure
 *  @budget: count of how many packets we should handle
 **/
static int igb_poll(struct napi_struct *napi, int budget)
{
        struct igb_q_vector *q_vector = container_of(napi,
                                                     struct igb_q_vector,
                                                     napi);
        bool clean_complete = true;

#ifdef CONFIG_IGB_DCA
        if (q_vector->adapter->flags & IGB_FLAG_DCA_ENABLED)
                igb_update_dca(q_vector);
#endif

        /* ... */

        if (q_vector->rx.ring)
                clean_complete &= igb_clean_rx_irq(q_vector, budget);

        /* If all work not completed, return budget and keep polling */
        if (!clean_complete)
                return budget;

        /* If not enough Rx work done, exit the polling mode */
        napi_complete(napi);
        igb_ring_irq_enable(q_vector);

        return 0;
}

  

上述代码干了如下这些有趣的事情:

  • 如果内核支持Direct Cache Access(DCA),那么CPU cache就是热的,对于RX ring的访问就会命中CPU cache
  • 接着调用igb_clean_rx_queue进行具体的操作
  • 检查clean_complete确认是否还有更多工作要做。如果有的话,返回budget。如上文所述,net_rx_action会将该NAPI结构移到poll list的尾端
  • 否则,驱动会通过调用napi_complete关闭NAPI并且通过调用igb_ring_irq_enable重新开启中断。下一个中断的到来又会开启NAPI

让我们来看看igb_clean_rx_irq是如何将数据送往协议栈的

igb_clean_rx_irq

igb_clean_rx_irq函数是一个循环,它一次处理一个packet直到到达budget或者没有多余的数据需要处理了。

函数中的循环干了如下这些非常重要的事情:

  1. 申请额外的缓存来接收数据,因为被使用的缓存已经被清除出去了,每次新加IGB_RX_BUFFER_WRITE(16)
  2. 从receive queue中获取缓存并将它存储在skb结构中
  3. 检查缓存是不是"End of Packet"。如果是的话,接着进行处理。否则接着从receive queue中获取缓存,将它们加入skb。这是必要的,因为接收到的数据帧可能比缓存大
  4. 确认数据的分布和头部是否正确
  5. 处理的字节数被保存在skb->len
  6. 设置skb的hash,checksum,timestamp,VLAN id和protocol field。hash,checksum,timestamp,VLAN id都是由硬件提供的。如果硬件声明了一个checksum error,csum_error就会增加。如果checksum成功了,并且数据是UDP或TCP数据,那么该skb就被标记为CHECKSUM_UNNECESSARY。如果checksum失败了,就交由协议栈进行处理。protocol通过调用eth_type_trans计算并且被存放在skb结构中
  7. 组织好的skb结构通过调用napi_gro_receive被传递给网络栈
  8. 已处理包的统计数据增加
  9. 循环继续,直到处理包的数目达到budget

一旦循环结束,函数会将收到的packet数和字节数加到统计数据中。

接着我们首先来聊一聊Generic Receive Offloading(GRO),之后再进入函数napi_gro_receive

Generic Receive Offloading(GRO)

Generic Receive Offloading(GRO)是硬件层面的优化Large Receive Offloading(LRO)的软件实现。这两种方法的核心思想都是通过将"类似"的包组合起来以减少传输给网络栈的包的数量,从而减少CPU的使用。例如我们要传输一个大文件,其中有许多包都包含的都是文件中的数据块。显然,我们可以不用每次都将一个small packet发往网络栈,而是将这些包组合起来,增大负载,最后让这个组合起来的包发往协议栈。这就可以让协议层只处理一个包的头部,就能传输更多的数据到用户空间。

但是这类优化的最大问题就是,信息丢失。如果一个packet中设置了一些重要的选项或者标志,如果将这个包和其他包合并,这些选项或者标志就会丢失。这也就是为什么很多人都不建议使用LRO的原因。事实上,LRO对于合并包的规则的定义是非常宽松的。

GRO作为LRO的硬件实现被引入,但是对于哪些包可以组合有着更为严格的规则

如果你有使用过tcpdump并且看到了一些大的不可思议的包,那么很有可能你的系统已经打开了GRO。你很快就能看到,抓包工具进行抓包的位置是在GRO发生之后,在协议栈的更上层。

napi_gro_receive

函数napi_gro_receive用于处理网络数据的GRO操作(如果GRO打开的话)并将数据传送到协议栈。而一个叫做dev_gro_receive的函数处理了其中的大部分逻辑

dev_gro_receive

这个函数首先检查GRO是否打开,如果打开的话,则准备进行GRO操作。当GRO打开时,首先会遍历一系列的GRO offload filter从而让上层的协议栈对要进行GRO的数据进行处理。这样协议层就能让设备层知道,该packet是否属于正在处理的network flow以及处理一些对于GRO所需要做的特定于协议的事情。例如,TCP协议需要知道是否或者何时需要给一个已经组合到现有packet的packet发送ACK

list_for_each_entry_rcu(ptype, head, list) {
  if (ptype->type != type || !ptype->callbacks.gro_receive)
    continue;

  skb_set_network_header(skb, skb_gro_offset(skb));
  skb_reset_mac_len(skb);
  NAPI_GRO_CB(skb)->same_flow = 0;
  NAPI_GRO_CB(skb)->flush = 0;
  NAPI_GRO_CB(skb)->free = 0;

  pp = ptype->callbacks.gro_receive(&napi->gro_list, skb);
  break;
}

  

如果协议层认为是时候清除GRO packet了,之后就会调用napi_gro_complete进行处理,之后它就会调用协议层对应的gro_complete,最后再调用netif_receive_skb将包传送给网络栈

if (pp) {
  struct sk_buff *nskb = *pp;

  *pp = nskb->next;
  nskb->next = NULL;
  napi_gro_complete(nskb);
  napi->gro_count--;
}

  

如果协议层将packet合并进existing flow,napi_gro_receive就会直接返回。如果packet没有被合并,并且现在的GRO flow小于MAX_GRO_SKBS,之后就会在该NAPI的gro_list新增一个条目

if (NAPI_GRO_CB(skb)->flush || napi->gro_count >= MAX_GRO_SKBS)
  goto normal;

napi->gro_count++;
NAPI_GRO_CB(skb)->count = 1;
NAPI_GRO_CB(skb)->age = jiffies;
skb_shinfo(skb)->gso_size = skb_gro_len(skb);
skb->next = napi->gro_list;
napi->gro_list = skb;
ret = GRO_HELD;

  

Linux网络栈中的GRO系统就是这样工作的

napi_skb_finish

一旦dev_gro_receive运行完成,napi_skb_finish就会被调用,要不就是释放因为包已经被合并就没用了的数据结构,要么调用netif_receive_skb将数据传输给网络栈(因为现在已经有MAX_GRO_SKBS个flow了)。现在是时候看看netif_receive_skb是如何将数据传输给协议层了。但是在此之前,我们先来看看什么是Receive Packet Steering(RPS)

Receive Packet Steering(RPS)

我们已经知道每个网络设备驱动都注册了一个NAPI poll函数。每个NAPI poller实例都执行在每个CPU的softirq上下文中。而处理驱动的IRQ handler的CPU会唤醒它的softirq processing loop去处理包。换句话说:处理硬件中断的CPU也会用于poll相应的输入数据。

有的硬件(例如Intel I350)在硬件层面支持multiple queue。这意味着输入的数据会被分流到不同的receive queue,并被DMA到不同的内存区域,从而会有不同的NAPI结构处理对应的区域。从而能让多个CPU并行地处理来自设备的中断并且对数据进行处理。

这个特性我们就称作Receive Side Scaling(RSS)

而Receive Packet Steering(RPS)是RSS的软件实现。因为它是由软件实现的,因此它可以用于任何网卡,即使是那些只有一个receive queue的网卡。然而,同样因为是软件层面的实现,RPS只能在包从DMA内存区域中取出之后,才能对它进行处理。这意味着,你并不会看到CPU使用在处理IRQ或者NAPI poll loop的时间下降,但是你可以从获取到包之后,对它进行负载均衡,并且从此处开始,到协议层向上减少CPU时间。

RPS通过对输入的数据计算出一个哈希值确定该由哪个CPU对其进行处理。之后,该数据会被排入每个CPU的receive network backlog等待处理。一个Inter-processor Interrupt(IPI)会被发往拥有该backlog的CPU。这会帮助触发backlog的处理,如果它当前仍未进行处理的话。/proc/net/softnet_stat中包含了每一个softnet_data中接收到的IPI的次数

因此,netif_receive_skb要么会接着将数据送往网络栈,要么就会通过RPS将它发往其他CPU进行处理

Receive Flow Steering(RFS) 

Receive Flow Steering(RFS)通常会和RPS混合使用。RPS会将输入数据在多个CPU之间进行负载均衡,但是它并不会考虑局部性从而最大化CPU cache的命中率。你可以使用RFS将属于同一个flow的数据送往同一个CPU处理,从而提高cache命中率。

Hardware accelerated Receive Flow Steering(aRFS)

RFS可以使用hardware acceleration来加速。网卡和内核可以联合起来,共同决定哪个flow需要发往哪个CPU进行处理。为了使用这一特性,你的网卡和驱动必须对它支持。如果你的网卡驱动有一个叫做ndo_rx_flow_steer的函数,那么该驱动支持accelerated RFS。

Moving up the network stack with netif_receive_skb

netif_receive_skb会在以下两个地方被调用:

  • napi_skb_finish,如果packet没有被合并进已经存在的GRO flow
  • napi_gro_complete,如果协议层表示是时候传输这个flow了

需要注意的是netif_receive_skb以及它后续调用的函数都是在softirq processing loop的上下文中进行的。netif_receive_skb首先检查一个sysctl的值用来确认用户是否要求在packet进入backlog queue之前或之后加入receive timestamp。如果有设置的话,现在就对该数据进行timestamp,在进行RPS之前。如果未被设置,则会在它加入队列之后在打timestamp。这可以将timestamp造成的负载在多个CPU间进行均衡,不过同样会引入延迟

netif_receive_skb

当timestamp被处理完之后,netif_receive_skb会根据RPS是否可用进行不同的操作。让我们先从最简单的开始:RPS不可用

Without RPS(default setting)

如果RPS不可用,首先会调用__netif_receive_skb做一些bookkeeping接着再调用__netif_receive_skb_core将数据移往协议栈。我们很快就会看到__netif_receive_skb_core是如何工作的,不过在此之前,我们先来看看RPS可用时的传输路径是怎样的,因为该代码同样会调用__netif_receive_skb_core。

With RPS enabled

如果RPS可用的话,在timestamp选项被处理完之后,netif_receive_skb会进行一些计算用于决定该使用哪个CPU的backlog queue。这是通过函数get_rps_cpu完成的

cpu = get_rps_cpu(skb->dev, skb, &rflow);

if (cpu >= 0) {
  ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
  rcu_read_unlock();
  return ret;
}

  

get_rps_cpu会将上文所述的RFS和aRFS都考虑在内,并调用enqueue_to_backlog将数据加入相应的CPU的backlog queue

enqueue_to_backlog

该函数首先获取远程CPU的softnet_data结构的指针,其中包含了一个指向input_pkt_queue的指针。接着,获取远程CPU的input_pkt_queue的队列长度

qlen = skb_queue_len(&sd->input_pkt_queue);
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {

  

input_pkt_queue的长度首先和netdev_max_backlog相比较。如果队列的长度大于该值,则数据被丢弃。同样flow limit也会被检查,如果超过了,数据同样会被丢弃。这两种情况下,softnet_data节后中丢弃包的数目都将增加。注意这里的softnet_data是数据将要发往的CPU的。

enqueue_to_backlog并不会在很多地方被调用。它只会在RPS可用的包处理过程中或者netif_rx中。许多驱动不应该使用netif_rx,而应该使用netif_receive_skb。如果你不使用RPS或者你的驱动不使用netif_rx,那么增加backlog不会对你的系统产生任何影响,因为它根本就没被用到。(如果你的驱动使用netif_receive_skb并且未使用RPS,那么增加netdev_max_backlog不会产生任何性能上的提高,因为没有数据会被加入到input_pkt_queue中)

如果input_pkt_queue足够小,而也没有超过flow limit,数据就会被加入队列。大概的逻辑如下:

  • 如果队列为空,检查远端CPU的NAPI是否启动。如果没有,检查是否有IPI准备发送。如果没有,则准备一个并且通过调用__napi_schedule启动NAPI processing loop,用于处理数据
  • 如果队列不为空,或者上述操作都已做完,则将数据加入队列
if (skb_queue_len(&sd->input_pkt_queue)) {
enqueue:
         __skb_queue_tail(&sd->input_pkt_queue, skb);
         input_queue_tail_incr_save(sd, qtail);
         rps_unlock(sd);
         local_irq_restore(flags);
         return NET_RX_SUCCESS;
 }

 /* Schedule NAPI for backlog device
  * We can use non atomic operation since we own the queue lock
  */
 if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
         if (!rps_ipi_queued(sd))
                 ____napi_schedule(sd, &sd->backlog);
 }
 goto enqueue;

  

Flow limits

RPS会将包分发到多个CPU进行处理,不过一个large flow很可能会占据整个CPU,从而让其他small flow处于饥饿状态。flow limit能够让每个flow添加到backlog中的包的数目有一个最大值。这个特性可以帮助small flow同样能够得到处理,即使有larger flow的包也在入队

backlog queue NAPI poller

每个CPU的backlog queue以和设备驱动程序一样的方式插入NAPI。一个poll函数用于处理来自softirq上下文的包,同样还提供了一个weight。这个NAPI结构在网络系统初始化的时候被处理:

sd->backlog.poll = process_backlog;
sd->backlog.weight = weight_p;
sd->backlog.gro_list = NULL;
sd->backlog.gro_count = 0;

  

backlog的NAPI结构和驱动的NAPI有所不同,它的weight参数是可以调节的,而驱动程序则会将它们的NAPI weight硬编码为64。

process_backlog

process_backlog函数是一个循环,直到它的weight耗尽或者backlog中没有其他数据需要处理。每一个在backlog中的数据都将从backlog queue传输到__netif_receive_skb。一旦数据到达__netif_receive_skb之后,它的传输路径就和RPS不可用时一样了。__netif_receive_skb只是在调用__netif_receive_skb_core将数据传输到协议栈之前做一些bookkeeping。

process_backlog和驱动程序使用NAPI的方式相同:如果weight没用完,那么关闭NAPI。而poller在enqueue_to_backlog调用__napi_schedule之后被重新启动。

该函数会返回已经完成的工作量,net_rx_action会将它从budget中减去

__netif_receive_skb_core delivers data to packet taps and protocol layer

__netif_receive_skb_core用于完成将数据传往网络栈的工作。在此之前,它先确认是否安装了packet taps用来抓取输入的包。其中一个例子就是libcap使用的AF_PACKET address family。如果有这样的tap存在,则数据先被发往tap,在被发往协议层。

Packet tap delivery

如果安装了packet tap,则包将安装以下代码被发送:

list_for_each_entry_rcu(ptype, &ptype_all, list) {
  if (!ptype->dev || ptype->dev == skb->dev) {
    if (pt_prev)
      ret = deliver_skb(skb, pt_prev, orig_dev);
    pt_prev = ptype;
  }
}

  

Protocol layer delivery

一旦tap处理完成之后,__netif_receive_skb_core会将数据发往协议层。先从数据中获取protocol field,然后再遍历一系列该协议类型对应的deliver functions

type = skb->protocol;
list_for_each_entry_rcu(ptype,
                &ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) {
        if (ptype->type == type &&
            (ptype->dev == null_or_dev || ptype->dev == skb->dev ||
             ptype->dev == orig_dev)) {
                if (pt_prev)
                        ret = deliver_skb(skb, pt_prev, orig_dev);
                pt_prev = ptype;
        }
}

  

上文中的ptype_base是一个如下所示的哈希表:

struct list_head ptype_base[PTYPE_HASH_SIZE] __read_mostly;

  

每个协议层都会在哈希表给定的slot中加入一个filter,通过如下的ptype_head函数计算:

static inline struct list_head *ptype_head(const struct packet_type *pt)
{
        if (pt->type == htons(ETH_P_ALL))
                return &ptype_all;
        else
                return &ptype_base[ntohs(pt->type) & PTYPE_HASH_MASK];
}

  

将filter加入list的操作是由dev_add_pack完成的。这就是协议层如何注册自己,从而获取发往它们的数据的方法。

现在我们就知道了数据如何从网卡发往协议层

Protocol layer registration

现在我们已经知道了数据如何从网络设备发往协议栈,下面我们就来看看协议层是如何注册自己的。

IP protocol layer

IP协议层会先把自己注册到ptype_base这个哈希表中,从而让数据能够从网络设备发往它

dev_add_pack(&ip_packet_type);

  

static struct packet_type ip_packet_type __read_mostly = {
        .type = cpu_to_be16(ETH_P_IP),
        .func = ip_rcv,
};

  

__netif_receive_skb_core会调用deliver_skb,而它最终会调用func(在这里,即为ip_rcv)

ip_rcv

ip_rcv的操作非常直接,首先对数据进行检查,然后更新一些统计数据。ip_rcv会最终通过netfilter将packet发往ip_rcv_finish,从而让那些iptables中匹配IP协议层的规则能够对数据进行处理

return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL, ip_rcv_finish);

  

需要注意的是,如果你有非常多,非常复杂的netfilter或者iptables规则,这些规则都会在softirq上下文中执行,从而导致网络栈的延时,而这往往是不可避免的。

ip_rcv_finish

当netfilter并没有把包丢弃时,就会调用ip_rcv_finish。ip_rcv_finish开始就有一个优化,为了将包传输到合适的地方,首先要从路由系统中获取dst_entry。因此,首先需要调用该数据发往的高层协议的early_demux。early_demux首先会判断是否有dst_entry缓存在socket结构中

if (sysctl_ip_early_demux && !skb_dst(skb) && skb->sk == NULL) {
  const struct net_protocol *ipprot;
  int protocol = iph->protocol;

  ipprot = rcu_dereference(inet_protos[protocol]);
  if (ipprot && ipprot->early_demux) {
    ipprot->early_demux(skb);
    /* must reload iph, skb->head might have changed */
    iph = ip_hdr(skb);
  }
}

  

我们可以看到这部分代码是由sysctl_ip_early_demux控制的。early_demux默认是打开的。如果该优化是打开的,并且没有cached entry(因为这是第一个到达的packet),则这个packet会被发往路由系统,在那能够获取dst_entry。

一旦路由系统工作完毕之后,就会更新计数器,然后再调用dst_input(skb),它转而会调用刚刚获取的dst_entry结构中的input function pointer。

如果packet的最终目的地是本地,那么路由系统就会将ip_local_deliver赋值给dst_entry中的input function pointer。

ip_local_deliver

/*
 *      Deliver IP Packets to the higher protocol layers.
 */
int ip_local_deliver(struct sk_buff *skb)
{
        /*
         *      Reassemble IP fragments.
         */

        if (ip_is_fragment(ip_hdr(skb))) {
                if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER))
                        return 0;
        }

        return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN, skb, skb->dev, NULL,
                       ip_local_deliver_finish);
}

  

和ip_rcv_finish类似,netfilter会先对packet进行检查,若未被丢弃,则调用ip_local_deliver_finish

ip_local_deliver_finish

ip_local_deliver_finish从packet中获取protocol,然后查询该protocol注册的net_protocol结构,接着再调用该net_protocol结构中的handler函数指针。这就将packet发往更高的协议层了。

Higher level protocol registration

本篇文章主要分析UDP,但是TCP protocol handler和UDP protocol handler的注册方式是相同的。在net/ipv4/af_inet.c中的函数定义包含了用于UDP,TCP和ICMP协议和IP协议层进行连接的处理函数

static const struct net_protocol tcp_protocol = {
        .early_demux    =       tcp_v4_early_demux,
        .handler        =       tcp_v4_rcv,
        .err_handler    =       tcp_v4_err,
        .no_policy      =       1,
        .netns_ok       =       1,
};

static const struct net_protocol udp_protocol = {
        .early_demux =  udp_v4_early_demux,
        .handler =      udp_rcv,
        .err_handler =  udp_err,
        .no_policy =    1,
        .netns_ok =     1,
};

static const struct net_protocol icmp_protocol = {
        .handler =      icmp_rcv,
        .err_handler =  icmp_err,
        .no_policy =    1,
        .netns_ok =     1,
};

  

这些结构都在inet address family的初始化代码中被注册

/*
  *      Add all the base protocols.
  */

 if (inet_add_protocol(&icmp_protocol, IPPROTO_ICMP) < 0)
         pr_crit("%s: Cannot add ICMP protocol
", __func__);
 if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0)
         pr_crit("%s: Cannot add UDP protocol
", __func__);
 if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0)
         pr_crit("%s: Cannot add TCP protocol
", __func__);

  

我们关注的是UDP协议层,因此对应的处理函数是udp_rcv。这就是数据从IP层通往UDP层的入口

UDP protocol layer

udp_rcv

udp_rcv函数只有一行代码用于直接调用__udp4_lib_rcv用于接收数据

__udp4_lib_rcv

__udp4_lib_rcv函数会检查packet是否合法,接着再获取UDP header,UDP数据报长度,源地址,目的地址,然后是一些完整性检查和checksum verification。

之前在IP层的时候,我们已经看到在将包传送到上层协议之前会将dst_entry和packet相绑定。如果socket和对应的dst_entry已经找到了,那么__udp4_lib_rcv会将包存入socket:

sk = skb_steal_sock(skb);
if (sk) {
  struct dst_entry *dst = skb_dst(skb);
  int ret;

  if (unlikely(sk->sk_rx_dst != dst))
    udp_sk_rx_dst_set(sk, dst);

  ret = udp_queue_rcv_skb(sk, skb);
  sock_put(sk);
  /* a return value > 0 means to resubmit the input, but
   * it wants the return to be -protocol, or 0
   */
  if (ret > 0)
    return -ret;
  return 0;
} else {

  

如果在之前的early_demux操作中没有找到socket,那么就会调用__udp4_lib_lookup_skb对receiving socket进行查找。无论上述哪种情况,最终数据将被存入socket:

ret = udp_queue_rcv_skb(sk, skb);
sock_put(sk);

  

如果没有找到socket,那么数据报将被丢弃:

/* No socket. Drop packet silently, if checksum is wrong */
if (udp_lib_checksum_complete(skb))
        goto csum_error;

UDP_INC_STATS_BH(net, UDP_MIB_NOPORTS, proto == IPPROTO_UDPLITE);
icmp_send(skb, ICMP_DEST_UNREACH, ICMP_PORT_UNREACH, 0);

/*
 * Hmm.  We got an UDP packet to a port to which we
 * don't wanna listen.  Ignore it.
 */
kfree_skb(skb);
return 0;

  

udp_queue_rcv_skb

这个函数的初始部分如下所示:

  • 判断该socket是不是一个encapsulation socket,如果是的话,在继续处理前,将packet传送给本层的处理函数。
  • 确定该包是不是UPD-Lite数据包并做一些完整性检查
  • 检查UDP checksum,如果失败的话则丢弃

最终我们到达了处理receive queue的逻辑,首先检查socket对应的receive queue是不是已经满了:

if (sk_rcvqueues_full(sk, skb, sk->sk_rcvbuf))
  goto drop;

  

sk_rcvqueue_full

sk_rcvqueue_full函数会检查socket的backlog长度以及socket的sk_rmem_alloc来确认它们的和是否大于socket的sk_rcvbuf

/*
 * Take into account size of receive queue and backlog queue
 * Do not take into account this skb truesize,
 * to allow even a single big packet to come.
 */
static inline bool sk_rcvqueues_full(const struct sock *sk, const struct sk_buff *skb,
                                     unsigned int limit)
{
        unsigned int qsize = sk->sk_backlog.len + atomic_read(&sk->sk_rmem_alloc);

        return qsize > limit;
}

  

udp_queue_rcv_skb

一旦证明队列未满,则会继续将数据加入队列

bh_lock_sock(sk);
if (!sock_owned_by_user(sk))
  rc = __udp_queue_rcv_skb(sk, skb);
else if (sk_add_backlog(sk, skb, sk->sk_rcvbuf)) {
  bh_unlock_sock(sk);
  goto drop;
}
bh_unlock_sock(sk);

return rc;

第一步先判断socket当前是否被用户进程占用。如果不是,则调用__udp_queue_rcv_skb将数据加入receive queue。如果是,则通过调用sk_add_backlog将数据加入backlog。backlog中的数据最终都会加入receive queue,socket相关的系统调用通过调用release_sock释放了该socket

__udp_queue_rcv_skb

__udp_queue_rcv_skb通过调用sock_queue_rcv_skb将数据加入receive queue,如果该数据不能被加入receive queue,则更新统计数据

rc = sock_queue_rcv_skb(sk, skb);
if (rc < 0) {
  int is_udplite = IS_UDPLITE(sk);

  /* Note that an ENOMEM error is charged twice */
  if (rc == -ENOMEM)
    UDP_INC_STATS_BH(sock_net(sk), UDP_MIB_RCVBUFERRORS,is_udplite);

  UDP_INC_STATS_BH(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
  kfree_skb(skb);
  trace_udp_fail_queue_rcv_skb(rc, sk);
  return -1;
}

  

Queuing data to a socket

现在数据已经通过调用sock_queue_rcv加入socket的队列了。这个函数在将数据加入队列前做了如下的操作:

  1. 判断socket申请的内存数量是否超过了receive buffer size,如果是的话,socket丢弃包的计数器将会增加
  2. sk_filter用于处理任何施加到该socket的Berkeley Packet Filter
  3. 运行sk_rmem_schedule确保有足够的receive buffer space用于接收数据
  4. 调用skb_set_owner_r,增加sk->sk_rmem_alloc
  5. 通过调用__skb_queue_tail将数据加入队列
  6. 最终,那些监听该socket的进程都会收到通知,从而调用sk_data_ready函数

以上就是数据如何到达系统,并通过整个协议栈到达socket并准备给用户进程使用的过程

原文链接:

https://blog.packagecloud.io/eng/2016/06/22/monitoring-tuning-linux-networking-stack-receiving-data/

原文地址:https://www.cnblogs.com/YaoDD/p/7615005.html