tcp输入数据 慢速路径处理 && oob数据 接收 && 数据接收 tcp_data_queue

大致的处理过程

TCP的接收流程:在tcp_v4_do_rcv中的相关处理(网卡收到报文触发)中,会首先通过tcp_check_urg设置tcp_sock的urg_data为TCP_URG_NOTYET(urgent point指向的可能不是本报文,而是后续报文或者前面收到的乱序报文),并保存最新的urgent data的sequence和对于的1 BYTE urgent data到tcp_sock的urg_data (如果之前的urgent data没有读取,就会被覆盖)。

用户接收流程:在tcp_recvmsg流程中,如果发现当前的skb的数据中有urgent data,首先拷贝urgent data之前的数据,然后tcp_recvmsg退出,提示用户来接收OOB数据;在用户下一次调用tcp_recvmsg来接收数据的时候,会跳过urgent data,并设置urgent data数据接收完成

/* This is the 'fast' part of urgent handling. 
Linxu内核在默认情况下,把urgent data实现为OOB数据
TCP的接收流程:在tcp_v4_do_rcv中的相关处理(网卡收到报文触发)中,会首先通过tcp_check_urg设置tcp_sock的urg_data为
TCP_URG_NOTYET(urgent point指向的可能不是本报文,而是后续报文或者前面收到的乱序报文),并保存最新的urgent data的sequence
和对于的1 BYTE urgent data到tcp_sock的urg_data (如果之前的urgent data没有读取,就会被覆盖)。

用户接收流程:在tcp_recvmsg流程中,如果发现当前的skb的数据中有urgent data,首先拷贝urgent data之前的数据,
然后tcp_recvmsg退出,提示用户来接收OOB数据;在用户下一次调用tcp_recvmsg来接收数据的时候,会跳过urgent data,
并设置urgent data数据接收完成。 相关的数据结构和定义

urg_data成员,其高8bit为urgent data的接收状态;其低8位为保存的1BYTE urgent数据。
urgent data的接收状态对应的宏的含义描述:


#defineTCP_URG_VALID 0x0100   //*urgent data已经读到了tcp_sock::urg_data

#defineTCP_URG_NOTYET   0x0200  //*已经发现有urgent data,还没有读取到tcp_sock::urg_data

#defineTCP_URG_READ       0x0400  //*urgent data已经被用户通过MSG_OOB读取了

*/
static void tcp_urg(struct sock *sk, struct sk_buff *skb, const struct tcphdr *th)
{
    struct tcp_sock *tp = tcp_sk(sk);

    /* Check if we get a new urgent pointer - normally not. */
    if (th->urg)/*收到了urgent data,则检查和设置urg_data和urg_seq成员*/  
        tcp_check_urg(sk, th);

    /* Do we wait for any urgent data? - normally not... */
    if (tp->urg_data == TCP_URG_NOTYET) {
        u32 ptr = tp->urg_seq - ntohl(th->seq) + (th->doff * 4) -
              th->syn;

        /* Is the urgent pointer pointing into this packet? 
        发现了有urgent data,但是还没有保存到tp->urg_data*/ */
        if (ptr < skb->len) {
            u8 tmp;
            if (skb_copy_bits(skb, ptr, &tmp, 1))
                BUG();
            tp->urg_data = TCP_URG_VALID | tmp;
            if (!sock_flag(sk, SOCK_DEAD))
                sk->sk_data_ready(sk);
        }
    }
}

用户接收数据接口

用户接收URG数据的接口
static int tcp_recv_urg(struct sock *sk, struct msghdr *msg, int len, int flags)
{
    struct tcp_sock *tp = tcp_sk(sk);

    /* No URG data to read. 用户已经读取过  或者没数据*/
    if (sock_flag(sk, SOCK_URGINLINE) || !tp->urg_data ||
        tp->urg_data == TCP_URG_READ)
        return -EINVAL;    /* Yes this is right ! */

    if (sk->sk_state == TCP_CLOSE && !sock_flag(sk, SOCK_DONE))
        return -ENOTCONN;
    /*当前的tp->urg_data为合法的数据,可以读取*/  

    if (tp->urg_data & TCP_URG_VALID) {
        int err = 0;
        char c = tp->urg_data;

        if (!(flags & MSG_PEEK))
            tp->urg_data = TCP_URG_READ;

        /* Read urgent data. */
        msg->msg_flags |= MSG_OOB;

        if (len > 0) {
            if (!(flags & MSG_TRUNC))
                err = memcpy_to_msg(msg, &c, 1);
            len = 1;
        } else
            msg->msg_flags |= MSG_TRUNC;

        return err ? -EFAULT : len;
    }

    if (sk->sk_state == TCP_CLOSE || (sk->sk_shutdown & RCV_SHUTDOWN))
        return 0;

    /* Fixed the recv(..., MSG_OOB) behaviour.  BSD docs and
     * the available implementations agree in this case:
     * this call should never block, independent of the
     * blocking state of the socket.
     * Mike <pall@rz.uni-karlsruhe.de>
     */
    return -EAGAIN;
}
用户接收普通数据的接口中的相关处理

在用户接收数据的tcp_recvmsg函数中,在查找到待拷贝的skb后,首先拷贝urgent data数据前的数据,然后退出接收过程,在用户下一次执行tcp_recvmsg的时候跳过urgent data,设置urgent data读取结束

查找到准备拷贝的skb后的处理:

    int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock,
        int flags, int *addr_len)
{
-------------------------

    /* Urgent data needs to be handled specially. */
    if (flags & MSG_OOB)
        goto recv_urg;
        
        
---------------------------------------------        
            /* Are we at urgent data? Stop if we have read anything or have SIGURG pending. 
        在接收完urgent data数据前的所有数据之后, tcp_recvmsg的以下代码片段得到执行,
        这段代码退出当前接收过程,提示用户有urgent data数据到来,需要用MSG_OOB来接收

        */
        if (tp->urg_data && tp->urg_seq == *seq) {
            if (copied)
                break;
            if (signal_pending(current)) {
                copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
                break;
            }
        }

-------------------------------------
/* Do we have urgent data here? */
        if (tp->urg_data) {/* 当前有urg_data数据*/ 
            u32 urg_offset = tp->urg_seq - *seq;
            if (urg_offset < used) {/*urgent data在当前待拷贝的数据范围内*/  
                if (!urg_offset) {/*待拷贝的数据就是urgent data,跨过该urgent data, 
        只给用户读取后面的数据*/  
                    if (!sock_flag(sk, SOCK_URGINLINE)) {
                        ++*seq;
                        urg_hole++;
                        offset++;
                        used--;
                        if (!used)
                            goto skip_copy;
                    }
                } else/*指定只拷贝urgent data数据之前的,完成后在下一次循环 
        开始的位置,会退出循环,返回用户;下一次用户调用tcp_recvmsg 
        就进入到上面的分支了*/  
                    used = urg_offset;
            }
        }
        
        ---------------------





skip_copy:/*用户读取的数据跨过了urgent point,设置读取结束 
        开启fast path*/  
        if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) {
            tp->urg_data = 0;
            tcp_fast_path_check(sk);
        }
-----------------------------------


out:
    release_sock(sk);
    return err;

recv_urg:
    err = tcp_recv_urg(sk, msg, len, flags);
    goto out;


}

TCP的urg数据,由于定义和实现上的混乱,当前已经不建议使用,但是为了兼容之前已经已经存在的实现,该机制会长期在内核中存在,如果不了解该机制及其内核行为,有可能就很难解释一些奇怪的问题:比如某段代码不小心地造成send接口事实上设置了MSG_OOB,就会造成接收端少了一个BYTE

}
/* This is what the send packet queuing engine uses to pass
 * TCP per-packet control information to the transmission code.
 * We also store the host-order sequence numbers in here too.
 * This is 44 bytes if IPV6 is enabled.
 * If this grows please adjust skbuff.h:skbuff->cb[xxx] size appropriately.
 */
struct tcp_skb_cb {
    __u32        seq;        /* Starting sequence number    // 当前tcp包的第一个序列号*/
    __u32        end_seq;    /* SEQ + FIN + SYN + datalen// 表示当前tcp 包结束的序列号:seq + FIN + SYN + 数据长度len    */
    union {
        /* Note : tcp_tw_isn is used in input path only
         *      (isn chosen by tcp_timewait_state_process())
         *
         *       tcp_gso_segs/size are used in write queue only,
         *      cf tcp_skb_pcount()/tcp_skb_mss()
         */
        __u32        tcp_tw_isn;
        struct {
            u16    tcp_gso_segs;
            u16    tcp_gso_size;
        };
    };
    __u8        tcp_flags;    /* TCP header flags. (tcp[13]) tcp头的flag    */

    __u8        sacked;        /* State flags for SACK/FACK. SACK/FACK的状态flag或者是sack option的偏移    */
#define TCPCB_SACKED_ACKED    0x01    /* SKB ACK'd by a SACK block    */
#define TCPCB_SACKED_RETRANS    0x02    /* SKB retransmitted        */
#define TCPCB_LOST        0x04    /* SKB is lost            */
#define TCPCB_TAGBITS        0x07    /* All tag bits            */
#define TCPCB_REPAIRED        0x10    /* SKB repaired (no skb_mstamp)    */
#define TCPCB_EVER_RETRANS    0x80    /* Ever retransmitted frame    */
#define TCPCB_RETRANS        (TCPCB_SACKED_RETRANS|TCPCB_EVER_RETRANS| 
                TCPCB_REPAIRED)

    __u8        ip_dsfield;    /* IPv4 tos or IPv6 dsfield    */
    __u8        txstamp_ack:1,    /* Record TX timestamp for ack? */
            eor:1,        /* Is skb MSG_EOR marked? */
            unused:6;
    __u32        ack_seq;    /* Sequence number ACK'd    ack(确认)的序列号*/
    union {
        struct {
            /* There is space for up to 24 bytes */
            __u32 in_flight:30,/* Bytes in flight at transmit */
                  is_app_limited:1, /* cwnd not fully used? */
                  unused:1;
            /* pkts S/ACKed so far upon tx of skb, incl retrans: */
            __u32 delivered;
            /* start of send pipeline phase */
            struct skb_mstamp first_tx_mstamp;
            /* when we reached the "delivered" count */
            struct skb_mstamp delivered_mstamp;
        } tx;   /* only used for outgoing skbs */
        union {
            struct inet_skb_parm    h4;
#if IS_ENABLED(CONFIG_IPV6)
            struct inet6_skb_parm    h6;
#endif
        } header;    /* For incoming skbs */
    };
};

static void tcp_data_queue(struct sock *sk, struct sk_buff *skb)
{
    struct tcp_sock *tp = tcp_sk(sk);
    bool fragstolen = false;
    int eaten = -1;
    //没有数据部分,直接释放
    if (TCP_SKB_CB(skb)->seq == TCP_SKB_CB(skb)->end_seq) {
        __kfree_skb(skb);
        return;
    }
    skb_dst_drop(skb);
    __skb_pull(skb, tcp_hdr(skb)->doff * 4);

    tcp_ecn_accept_cwr(tp, skb);

    tp->rx_opt.dsack = 0;

    /*  Queue data for delivery to the user.
     *  Packets in sequence go to the receive queue.
     *  Out of sequence packets to the out_of_order_queue.
     *///如果该数据包刚好是下一个要接收的数据,则可以直接copy到用户空间(如果存在且可用
    if (TCP_SKB_CB(skb)->seq == tp->rcv_nxt) {//非乱序包
        if (tcp_receive_window(tp) == 0)//接受窗口满了,不能接受
            goto out_of_window;

        /* Ok. In sequence. In window. */
        if (tp->ucopy.task == current &&//应用程序上下文中,在tcp_recvmsg
            tp->copied_seq == tp->rcv_nxt && tp->ucopy.len &&//正要读取这个包,并且应用程序还有剩余缓存
            sock_owned_by_user(sk) && !tp->urg_data) {//应用程序持有锁
            int chunk = min_t(unsigned int, skb->len,
                      tp->ucopy.len);/* 带读取长度和数据段长度的较小值 */

            __set_current_state(TASK_RUNNING);

            if (!skb_copy_datagram_msg(skb, 0, tp->ucopy.msg, chunk)) {//copy数据到ucopy中
                tp->ucopy.len -= chunk;
                tp->copied_seq += chunk;
                eaten = (chunk == skb->len);
                tcp_rcv_space_adjust(sk);//每次copy数据到ucopy都要调用该函数,动态更新sk_rcvbuf大小
            }
        }

        if (eaten <= 0) {//没有copy到ucopy,或者没有全部copy
queue_and_out:
            if (eaten < 0) {//没有copy  /* 没有拷贝到用户空间,对内存进行检查 */
                if (skb_queue_len(&sk->sk_receive_queue) == 0)//sk_receive_queue没有数据
                    sk_forced_mem_schedule(sk, skb->truesize);//检查是否需要分配配额
                else if (tcp_try_rmem_schedule(sk, skb, skb->truesize))//查看接收缓存空间够不够
                    goto drop;
            }
            eaten = tcp_queue_rcv(sk, skb, 0, &fragstolen);//添加到sk_receive_queue
        }
        tcp_rcv_nxt_update(tp, TCP_SKB_CB(skb)->end_seq);//尝试更新rcv_nxt
        if (skb->len)
            tcp_event_data_recv(sk, skb);
        if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
            tcp_fin(sk);    //fin处理

        if (!RB_EMPTY_ROOT(&tp->out_of_order_queue)) {//ofo队列非空
            tcp_ofo_queue(sk);//尝试把ofo中的包移动到sk_receive_queue中

            /* RFC2581. 4.2. SHOULD send immediate ACK, when
             * gap in queue is filled.
             */
            if (RB_EMPTY_ROOT(&tp->out_of_order_queue))
                inet_csk(sk)->icsk_ack.pingpong = 0;//ofo队列被清空了,需要立即发送ack
        }

        if (tp->rx_opt.num_sacks)// 新数据到达导致返回给对方的SACK Block 调整
            tcp_sack_remove(tp);//如果当前收到的包带sack,则删除其中不需要的部分,有可能ofo填充造成了rcv_next的移动

        tcp_fast_path_check(sk);//当前是slow path, 尝试开启快速路径  重新设置 tcp 首部预测标志

        if (eaten > 0)
            kfree_skb_partial(skb, fragstolen);//已经全部copy到ucopy,可以释放skb了
        if (!sock_flag(sk, SOCK_DEAD))
            sk->sk_data_ready(sk);//通知poll数据到达
        return;
    }
    /* 重传 */
    if (!after(TCP_SKB_CB(skb)->end_seq, tp->rcv_nxt)) {//判断是否是重传的旧包,标记dsack
        /* A retransmit, 2nd most common case.  Force an immediate ack. */
        NET_INC_STATS(sock_net(sk), LINUX_MIB_DELAYEDACKLOST);
        tcp_dsack_set(sk, TCP_SKB_CB(skb)->seq, TCP_SKB_CB(skb)->end_seq);//设置dsack

out_of_window:
        tcp_enter_quickack_mode(sk);//进入快速ack
        inet_csk_schedule_ack(sk);///标记需要ack
drop:
        tcp_drop(sk, skb);
        return;
    }

    /* Out of window. F.e. zero window probe. *///收到的包在接收窗口范围之外
     /* 窗口以外的数据,比如零窗口探测报文段 */
    if (!before(TCP_SKB_CB(skb)->seq, tp->rcv_nxt + tcp_receive_window(tp)))
        goto out_of_window;
    //乱序包或者需要dsack都要快速ack

    tcp_enter_quickack_mode(sk);
    /* 数据段重叠 */
    if (before(TCP_SKB_CB(skb)->seq, tp->rcv_nxt)) {
        /* Partial packet, seq < rcv_next < end_seq *///seq < rcv_next < end_seq,部分旧数据
        SOCK_DEBUG(sk, "partial packet: rcv_next %X seq %X - %X
",
               tp->rcv_nxt, TCP_SKB_CB(skb)->seq,
               TCP_SKB_CB(skb)->end_seq);

        tcp_dsack_set(sk, TCP_SKB_CB(skb)->seq, tp->rcv_nxt);//设置dsack

        /* If window is closed, drop tail of packet. But after
         * remembering D-SACK for its head made in previous line.
         */
        if (!tcp_receive_window(tp))//接收窗口不足,快速ack通知对方
            goto out_of_window;
        goto queue_and_out;//调用tcp_queue_rcv添加到sk_receive_queue,会尝试合并
    }

    tcp_data_queue_ofo(sk, skb);//乱续包,添加到ofo队列
}
原文地址:https://www.cnblogs.com/codestack/p/11919403.html