2-解决粘包问题

一、socket缓冲区

研究粘包之前先看看socket缓冲区的问题:

 

二、socket缓存区的详细解释

每个socket被创建后,都会分配两个缓冲区,输入缓冲区和输出缓冲区。

write()/send() 并不立即向网络中传输数据,而是先将数据写入缓冲区中,再由TCP协议将数据从缓冲区发送到目标机器。一旦将数据写入到缓冲区,函数就可以成功返回,不管它们有没有到达目标机器,也不管它们何时被发送到网络,这些都是TCP协议负责的事情。

TCP协议独立于write()/send()函数,数据有可能刚被写入缓冲区就发送到网络,也可能在缓冲区中不断积压,多次写入的数据被一次性发送到网络,这取决于当时的网络情况、当前线程是否空闲等诸多因素,不由程序员控制。

read()/recv()函数也是如此,也从输入缓冲区中读取数据,而不是直接从网络中读取。

这些I/O缓冲区特性可整理如下:
1.I/O缓冲区在每个TCP套接字中单独存在;
2.I/O缓冲区在创建套接字时自动生成;
3.即使关闭套接字也会继续传送输出缓冲区中遗留的数据;
4.关闭套接字将丢失输入缓冲区中的数据。

输入输出缓冲区的默认大小一般都是8K,可以通过getsockopt()函数获取

须知:只有TCP有粘包现象,UDP永远不会粘包!

具体原因:
发送端可以是一K一K地发送数据,而接收端的应用程序可以两K两K地提走数据,当然也有可能一次提走3K或6K数据,或者一次只提走几个字节的数据,也就是说,应用程序所看到的数据是一个整体,或说是一个流(stream),一条消息有多少字节对应用程序是不可见的,因此TCP协议是面向流的协议,这也是容易出现粘包问题的原因。

而UDP是面向消息的协议,每个UDP段都是一条消息,应用程序必须以消息为单位提取数据,不能一次提取任意字节的数据,这一点和TCP是很不同的。

怎样定义消息呢?可以认为对方一次性write/send的数据为一个消息,需要明白的是当对方send一条信息的时候,无论底层怎样分段分片,TCP协议层会把构成整条消息的数据段排序完成后才呈现在内核缓冲区。

例如基于tcp的套接字客户端往服务端上传文件,发送时文件内容是按照一段一段的字节流发送的,在接收方看来,根本不知道该文件的字节流从何处开始,在何处结束

所谓粘包问题主要是因为接收方不知道消息之间的界限,不知道一次性提取多少字节的数据所造成的。

此外,发送方引起的粘包是由TCP协议本身造成的,TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一个TCP段。若连续几次需要send的数据都很少,通常TCP会根据优化算法把这些数据合成一个TCP段后一次发送出去,这样接收方就收到了粘包数据。

TCP和UDP的区别

TCP(transport control protocol,传输控制协议)是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个包更有效的发到对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样,接收端就难于分辨出来了,必须提供科学的拆包机制。 即面向流的通信是无消息保护边界的。

UDP(user datagram protocol,用户数据报协议)是无连接的,面向消息的,提供高效率服务。不会使用块的合并优化算法,由于UDP支持的是一对多的模式,所以接收端的skbuff(套接字缓冲区)采用了链式结构来记录每一个到达的UDP包,在每个UDP包中就有了消息头(消息来源地址,端口等信息),这样,对于接收端来说,就容易进行区分处理了。 即面向消息的通信是有消息保护边界的。

解决粘包需要注意的问题

tcp是基于数据流的,于是收发的消息不能为空,这就需要在客户端和服务端都添加空消息的处理机制,防止程序卡住,而udp是基于数据报的,即便是你输入的是空内容(直接回车),那也不是空消息,udp协议会帮你封装上消息头。

udp的recvfrom是阻塞的,一个recvfrom(x)必须对唯一一个sendinto(y),收完了x个字节的数据就算完成,若是y>x数据就丢失,这意味着udp根本不会粘包,但是会丢数据,不可靠

tcp的协议数据不会丢,没有收完包,下次接收,会继续上次继续接收,己端总是在收到ack时才会清除缓冲区内容。数据是可靠的,但是会粘包。

 

三、两种情况下会发生粘包。

1、接收方没有及时接收缓冲区的包,造成多个包接收(客户端发送了一段数据,服务端只收了一小部分,服务端下次再收的时候还是从缓冲区拿上次遗留的数据,产生粘包)

服务端

import socket
import subprocess

phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
phone.bind(('127.0.0.1', 8080))
phone.listen(5)

while 1:  # 循环连接客户端
    conn, client_addr = phone.accept()
    print(client_addr)

    while 1:
        try:
            cmd = conn.recv(1024)
            ret = subprocess.Popen(cmd.decode('utf-8'), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            correct_msg = ret.stdout.read()
            error_msg = ret.stderr.read()
            conn.send(correct_msg + error_msg)
        except ConnectionResetError:
            break

conn.close()
phone.close()
View Code

客户端

import socket

phone = socket.socket(socket.AF_INET,socket.SOCK_STREAM)  # 买电话
phone.connect(('127.0.0.1',8080))  # 与客户端建立连接, 拨号

while 1:
    cmd = input('>>>')
    phone.send(cmd.encode('utf-8'))
    from_server_data = phone.recv(1024)
    print(from_server_data.decode('gbk'))
phone.close() 
# 由于客户端发的命令获取的结果大小已经超过1024,那么下次在输入命令,会继续取上次残留到缓存区的数据。
View Code

2、发送端需要等缓冲区满才发送出去,造成粘包(发送数据时间间隔很短,数据也很小,会合到一起,产生粘包)

服务端

import socket

phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
phone.bind(('127.0.0.1', 8080))
phone.listen(5)

conn, client_addr = phone.accept()
frist_data = conn.recv(1024)
print('1:',frist_data.decode('utf-8'))  # 1: helloworld
second_data = conn.recv(1024)
print('2:',second_data.decode('utf-8'))

conn.close()
phone.close()
View Code

客户端

import socket

phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
phone.connect(('127.0.0.1', 8080)) 
phone.send(b'hello')
phone.send(b'world')

phone.close()  
# 两次返送信息时间间隔太短,数据小,造成服务端一次收取
View Code

 

四、粘包的解决方案

粘包问题的根源在于,接收端不知道发送端将要传送的字节流的长度,所以解决粘包的方法就是围绕如何让发送端在发送数据前,把自己将要发送的字节流总数按照固定字节发送给接收端,后面跟上总数据,然后接收端先接收固定字节的总字节流,就可以知道需要接收的数据长度,再来一个死循环接收完所有数据。

方案一、低端版

服务端:

import socket
import subprocess
import struct

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 8082))
server.listen(5)

while 1:
    conn, addr = server.accept()
    print(conn, addr)

    while 1:
        try:
            cmd = conn.recv(1024)
            obj = subprocess.Popen(cmd.decode('utf-8'),
                                   shell=True,
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE,
                                   )
            ret = obj.stdout.read()
            ret_err = obj.stderr.read()
            total_size = len(ret+ret_err)
            total_size_bytes = struct.pack('i', total_size )
            # print(total_size)
            conn.send(total_size_bytes)
            conn.send(ret)
            conn.send(ret_err)
            
        except Exception:
            break
    conn.close()
server.close()
View Code

客户端:

import socket
import struct

client = socket.socket()
client.connect(('127.0.0.1', 8082))

while 1:
    msg = input('>>>').strip()
    if not msg:continue
    elif msg.upper() == 'Q':break
    client.send(msg.encode('utf-8'))
    header_size_bytes = client.recv(4)
    header_size =struct.unpack('i', header_size_bytes)[0]

    client_recv_data = b''
    size = 0
    while size < header_size:
        data = client.recv(1024)
        client_recv_data += data
        size += len(data)
        
    rst = client_recv_data.decode('gbk')
    print(rst)

client.close()
View Code

结果:

服务端显示
<socket.socket fd=236, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8082), raddr=('127.0.0.1', 14106)> ('127.0.0.1', 14106)
931
1790
5040
7802
8130

客户端:
执行dir,ipconfig,tasklist,netstat -an等命令都可以正常显示
View Code

这种方法当文件较小时可以使用,当文件太大时,total_size_bytes = struct.pack('i', total_size )也无法表示

方案二、高端版(可自定制报头版)

整体的流程解释:

我们可以把报头做成字典,字典里包含将要发送的真实数据的描述信息(大小之类的),然后json序列化,然后用struct将序列化后的数据长度打包成4个字节。
我们在网络上传输的所有数据都叫做数据包,数据包里的所有数据都叫做报文,报文里面不止有数据,还有ip地址、mac地址、端口号等等,其实所有的报文都有报头,这个报头是协议规定的。

发送时:
先发报头长度
再发报头,需要编码报头内容,其中包括数据的长度
最后发真实数据内容

接收时:
先接收报头长度,用struct取出来
根据取出的长度收取报头内容,然后解码,反序列化
从反序列化的结果中取出待接收数据的描述信息(数据长度),然后再接收真实的数据内容
View Code

服务端

import socket
import subprocess
import struct
import json

server_ip = ('192.168.10.1', 8088)
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(server_ip)
server_socket.listen(5)

while 1:
    conn, ipaddr = server_socket.accept()
    print(conn, ipaddr)
    
    while 1:
        try:
            cmd = conn.recv(1024)
            ret = subprocess.Popen(cmd.decode('utf-8'),
                                   shell=True,
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE,)
            msg = ret.stdout.read()
            err_msg = ret.stderr.read()
            
            # 1 数据总长度
            total_size = len(msg) + len(err_msg)
            # 2 制作报头字典
            header_dict = {
                'md5': 'fdsaf2143254f',
                'file_name': 'f1.txt',
                'total_size':total_size,
            }
        # 3 将报头字典转换成json再转换成bytes格式
            header_dict_json = json.dumps(header_dict)
             header_dict_bytes = header_dict_json.encode('utf-8')
            #   4 将报头长度转换成固定长度的struct格式
             header_dict_size = len(header_dict_bytes)
             header_dict_size_struct = struct.pack('i', header_dict_size)
        # 5 发送报头长度
            conn.send(header_size_struct)
             # 6 发送报头
            conn.send(header_dict)                
            # 7 发送真实数据:
            conn.send(msg)
            conn.send(error_msg)
        except ConnectionResetError:
            break

    conn.close()
phone.close()
View Code

客户端

import socket
import struct
import json
phone = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server_ip = ('192.168.10.1', 8088)
client_socket.connect(server_ip)

while 1:
    cmd = input('>>>')
    if not cmd:continue
    elif cmd.strip().upper() == 'Q':break
    client_socket.send(cmd.encode('utf-8'))

       # 1,接收固定报头长度的struct格式
    header_size_struct = client_socket.recv(4)

       #2,将struct格式的报头长度转换成int类型
       head_size = struct.unpack('i', header_size_struct)[0]

       #3,接收报头的字典的bytes类型,也就是json格式
       header_bytes = client_socket.recv(head_size)

       #4,将报头由bytes类型解码成字典
       header_dict = json.loads(header_bytes.decode('utf-8'))

       #5,从字典中获取数据总大小
       total_size = header_dict['total_size']
    
    #6,根据报头信息,循环接收真实数据
        recv_size = 0
        msg = b''
    while recv_size < total_size:
        recv_data = client_socket.recv(1024)
        msg += recv_data
        recv_size += len(recv_data)
        #由于msg是服务器在windows下执行命令的结果,默认编码为gbk,所以客户端显示需要使用gbk解码
        print(msg.decode('gbk'))

client_socket.close()
View Code

五、例子:FTP上传下载文件(简单版)

FTP上传下载文件(简单版)

server端

import socket
import struct
import json
sk = socket.socket()
# buffer = 4096 # 当双方接收发送的大小比较大的时候,比如4096,就会丢数据,改小了就ok,在linux上也是ok的。
buffer = 1024 #每次接收数据的大小
sk.bind(('127.0.0.1',8081))
sk.listen()

conn,addr = sk.accept()
print(conn, addr)
#接收文件头部大小,获取文件大小值,接收文件
head_len = conn.recv(4)
head_len = struct.unpack('i',head_len)[0] #解包
json_head = conn.recv(head_len).decode('utf-8') #反序列化
head = json.loads(json_head)
filesize = head['filesize']

with open(head['filename'],'wb') as f:
    recv_size = 0
    while recv_size < filesize:
        content = conn.recv(buffer)
        f.write(content)
        recv_size += len(content)
    print('文件名称:%s,文件大小:%s,已接收%s'%(head['filename'], filesize, recv_size))
conn.close()
sk.close()
View Code

client端

import os
import json
import socket
import struct
sk = socket.socket()
sk.connect(('127.0.0.1',8081))
buffer = 1024 #读取文件的时候,每次读取的大小
head = {
            'filepath':r'G:share#需要下载的文件路径,也就是文件所在的文件夹
            'filename':'1.py',  #改成上面filepath下的一个文件
            'filesize':None,
        }

file_path = os.path.join(head['filepath'],head['filename'])
filesize = os.path.getsize(file_path)
head['filesize'] = filesize
# json_head = json.dumps(head,ensure_ascii=False)  #字典转换成字符串
json_head = json.dumps(head)  #字典转换成字符串
bytes_head = json_head.encode('utf-8') #字符串转换成bytes类型

#计算head的长度,因为接收端先接收我们自己定制的报头
head_len = len(bytes_head) #报头长度
pack_len = struct.pack('i',head_len)
sk.send(pack_len)  #先发送报头长度
sk.send(bytes_head) #再发送bytes类型的报头

#即便是视频文件,也是可以按行来读取的,也可以readline,也可以for循环,但是读取出来的数据大小就不固定了,影响效率,有可能读的比较小,也可能很大,像视频文件一般都是一行的二进制字节流。
#所有我们可以用read,设定一个一次读取内容的大小,一边读一边发,一边收一边写
with open(file_path,'rb') as f:
    send_size = 0
    while send_size < filesize:
        content = f.read(buffer)
        sk.send(content)
        send_size += len(content)
print('文件大小:%s,已发送%s'%(filesize, send_size))
sk.close()
View Code


FTP上传下载文件(面向对象升级版)

server端

import socket
import struct
import json
import subprocess
import os

class MYTCPServer:
    address_family = socket.AF_INET
    socket_type = socket.SOCK_STREAM
    allow_reuse_address = False
    max_packet_size = 1024
    coding='utf-8'
    request_queue_size = 5
    server_dir='file_upload'

    def __init__(self, server_address, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        self.server_address=server_address
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

    def server_bind(self):
        """Called by constructor to bind the socket.
        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
        """Called by constructor to activate the server.
        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.
        """
        self.socket.close()

    def get_request(self):
        """Get the request and client address from the socket.
        """
        return self.socket.accept()

    def close_request(self, request):
        """Called to clean up an individual request."""
        request.close()

    def run(self):
        while True:
            self.conn,self.client_addr=self.get_request()
            print('from client ',self.client_addr)
            while True:
                try:
                    head_struct = self.conn.recv(4)
                    if not head_struct:break

                    head_len = struct.unpack('i', head_struct)[0]
                    head_json = self.conn.recv(head_len).decode(self.coding)
                    head_dic = json.loads(head_json)

                    print(head_dic)
                    #head_dic={'cmd':'put','filename':'a.txt','filesize':123123}
                    cmd=head_dic['cmd']
                    if hasattr(self,cmd):
                        func=getattr(self,cmd)
                        func(head_dic)
                except Exception:
                    break

    def put(self,args):
        file_path=os.path.normpath(os.path.join(
            self.server_dir,
            args['filename']
        ))

        filesize=args['filesize']
        recv_size=0
        print('----->',file_path)
        with open(file_path,'wb') as f:
            while recv_size < filesize:
                recv_data=self.conn.recv(self.max_packet_size)
                f.write(recv_data)
                recv_size+=len(recv_data)
                print('recvsize:%s filesize:%s' %(recv_size,filesize))

tcpserver1=MYTCPServer(('127.0.0.1',8080))
tcpserver1.run()
View Code

client端

import socket
import struct
import json
import os

class MYTCPClient:
    address_family = socket.AF_INET
    socket_type = socket.SOCK_STREAM
    allow_reuse_address = False
    max_packet_size = 8192
    coding='utf-8'
    request_queue_size = 5

    def __init__(self, server_address, connect=True):
        self.server_address=server_address
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if connect:
            try:
                self.client_connect()
            except:
                self.client_close()
                raise

    def client_connect(self):
        self.socket.connect(self.server_address)

    def client_close(self):
        self.socket.close()

    def run(self):
        while True:
            inp=input(">>: ").strip()
            if not inp:continue
            l=inp.split()
            cmd=l[0]
            if hasattr(self,cmd):
                func=getattr(self,cmd)
                func(l)

    def put(self,args):
        cmd=args[0]
        filename=args[1]
        if not os.path.isfile(filename):
            print('file:%s is not exists' %filename)
            return
        else:
            filesize=os.path.getsize(filename)

        head_dic={'cmd':cmd,'filename':os.path.basename(filename),'filesize':filesize}
        print(head_dic)
        head_json=json.dumps(head_dic)
        head_json_bytes=bytes(head_json,encoding=self.coding)

        head_struct=struct.pack('i',len(head_json_bytes))
        self.socket.send(head_struct)
        self.socket.send(head_json_bytes)
        send_size=0
        with open(filename,'rb') as f:
            for line in f:
                self.socket.send(line)
                send_size+=len(line)
                print(send_size)
            else:
                print('upload successful')

client=MYTCPClient(('127.0.0.1',8080))
client.run()
View Code

打印进度条示例

#=========知识储备==========
#进度条的效果
[#             ]
[##            ]
[###           ]
[####          ]

#指定宽度
print('[%-15s]' %'#')
print('[%-15s]' %'##')
print('[%-15s]' %'###')
print('[%-15s]' %'####')

#打印%
print('%s%%' %(100)) #第二个%号代表取消第一个%的特殊意义

#可传参来控制宽度
print('[%%-%ds]' %50) #[%-50s]
print(('[%%-%ds]' %50) %'#')
print(('[%%-%ds]' %50) %'##')
print(('[%%-%ds]' %50) %'###')


#=========实现打印进度条函数==========
import sys
import time

def progress(percent,width=50):
    if percent >= 1:
        percent=1
    show_str = ('%%-%ds' % width) % (int(width*percent)*'|')
    print('
%s %d%%' %(show_str, int(100*percent)), end='')


#=========应用==========
data_size=1025
recv_size=0
while recv_size < data_size:
    time.sleep(0.1) #模拟数据的传输延迟
    recv_size+=1024 #每次收1024

    percent=recv_size/data_size #接收的比例
    progress(percent,width=70) #进度条的宽度70
View Code
原文地址:https://www.cnblogs.com/dxnui119/p/10144029.html