多用户在线FTP程序 泽桐

项目名:多用户在线FTP程序

一、需求

1用户加密认证

2允许同时多用户登录

3每个用户有自己的家目录 ,且只能访问自己的家目录

4对用户进行磁盘配额,每个用户的可用空间不同

5允许用户在ftp server上随意切换目录

6允许用户查看当前目录下文件

7允许上传和下载文件,保证文件一致性

8文件传输过程中显示进度条

9支持文件的断点续传

 

二、代码结构

服务端:

 

客户端:

 

具体代码:

服务端:

server.py

# -*- coding: utf-8 -*-
import socket, os, json, re, struct, threading, time
from lib import commons
from conf import settings
from core import logger

class Server(object):
    def __init__(self):
        self.init_dir()
        self.sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        # self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind((settings.server_bind_ip,settings.server_bind_port))
        self.sock.listen(settings.server_listen)
        print("\033[42;1mserver started sucessful!\033[0m")
        self.run()

    @staticmethod
    def init_dir():
        if not os.path.exists(os.path.join(settings.base_path, 'logs')):os.mkdir(os.path.join(settings.base_path, 'logs'))
        if not os.path.exists(os.path.join(settings.base_path, 'db')):os.mkdir(os.path.join(settings.base_path, 'db'))
        if not os.path.exists(os.path.join(settings.base_path, 'home')):os.mkdir(os.path.join(settings.base_path, 'home'))

    def run(self):
        while True:#链接循环
            self.conn, self.cli_addr = self.sock.accept()
            # self.conn.settimeout(300)
            thread = threading.Thread(target=TCPHandler.handle, args=(TCPHandler(), self.conn, self.cli_addr))
            thread.start()

class TCPHandler(object):

    STATUS_CODE = {
        200 : 'Passed authentication!',
        201 : 'Wrong username or password!',
        202 : 'Username does not exist!',
        300 : 'cmd successful , the target path be returned in returnPath',
        301 : 'cmd format error!',
        302 : 'The path or file could not be found!',
        303 : 'The dir is exist',
        304 : 'The file has been downloaded or the size of the file is exceptions',
        305 : 'Free space is not enough',
        401 : 'File MD5 inspection failed',
        400 : 'File MD5 inspection success',
    }

    def __init__(self):
        self.server_logger = logger.logger('server')
        self.server_logger.debug("server TCPHandler started successful!")

    def handle(self, request, address):
        self.conn = request
        self.cli_addr = address
        self.server_logger.info('client[%s:%s] is conecting'% (self.cli_addr[0], self.cli_addr[1]))
        print('client[%s:%s] is conecting'% (self.cli_addr[0], self.cli_addr[1]))
        while True:#通讯循环
            try:
                #1、接收客户端的ftp命令
                header_dic, req_dic = self.recv_request()
                if not header_dic:break
                if not header_dic['cmd']:break
                print('收到客户端ftp指令:%s'%header_dic['cmd'])
                #2、解析ftp命令,获取相应命令参数(文件名)
                cmds = header_dic['cmd'].split()#['register',]、['get', 'a.txt']
                if hasattr(self, cmds[0]):
                    self.server_logger.info('interface:[%s], request:{client:[%s:%s] action:[%s]}'% (
                                                    cmds[0], self.cli_addr[0], self.cli_addr[1], header_dic['cmd']))
                    getattr(self, cmds[0])(header_dic, req_dic)
            except (ConnectionResetError, ConnectionAbortedError):break
            except socket.timeout:
                print('time out %s'%((self.cli_addr,)))
                break
        self.conn.close()
        self.server_logger.info('client %s is disconect'% ((self.cli_addr,)))
        print('client[%s:%s] is disconect'% (self.cli_addr[0], self.cli_addr[1]))

    def unpack_header(self):
        try:
            pack_obj = self.conn.recv(4)
            header_size = struct.unpack('i', pack_obj)[0]
            header_bytes = self.conn.recv(header_size)
            header_json = header_bytes.decode('utf-8')
            header_dic = json.loads(header_json)
            return header_dic
        except struct.error:#避免客户端发送错误格式的header_size
            return

    def unpack_info(self, info_size):
        recv_size = 0
        info_bytes = b''
        while recv_size < info_size:
            res = self.conn.recv(1024)
            info_bytes += res
            recv_size += len(res)
        info_json = info_bytes.decode('utf-8')
        info_dic = json.loads(info_json)#{'username':ton, 'password':123}
        info_md5 = commons.getStrsMd5(info_bytes)
        return info_dic, info_md5

    def recv_request(self):
        header_dic = self.unpack_header()#{'cmd':'register','info_size':0}
        if not header_dic:return None, None
        req_dic, info_md5 = self.unpack_info(header_dic['info_size'])
        if header_dic.get('md5'):
            #校检请求内容md5一致性
            if info_md5 == header_dic['md5']:
                print('\033[42;1m请求内容md5校检结果一致\033[0m')
            else:
                print('\033[31;1m请求内容md5校检结果不一致\033[0m')
        return header_dic, req_dic

    def response(self, **kwargs):
        rsp_info = kwargs
        rsp_bytes = commons.getDictBytes(rsp_info)
        md5 = commons.getStrsMd5(rsp_bytes)
        header_size_pack, header_bytes = commons.make_header(info_size=len(rsp_bytes),md5=md5)
        self.conn.sendall(header_size_pack)
        self.conn.sendall(header_bytes)
        self.conn.sendall(rsp_bytes)

    def register(self, header_dic, req_dic):#{'cmd':'register','info_size':0,'resultCode':0,'resultDesc':None}
        username = req_dic['user_info']['username']
        #更新数据库,并制作响应信息字典
        if not os.path.isfile(os.path.join(settings.db_file, '%s.json'%username)):
            #更新数据库
            user_info = dict()
            user_info['username'] = username
            user_info['password'] = req_dic['user_info']['password']
            user_info['home'] = os.path.join(settings.user_home_dir, username)
            user_info['quota'] = settings.user_quota*(1024*1024)
            commons.save_to_file(user_info, os.path.join(settings.db_file, '%s.json'%username))
            resultCode = 0
            resultDesc = None
            #创建家目录
            if not os.path.exists(os.path.join(settings.user_home_dir, username)):
                os.mkdir(os.path.join(settings.user_home_dir, username))
            self.server_logger.info('client[%s:%s] 注册用户[%s]成功'% (self.cli_addr[0], self.cli_addr[1], username))
        else:
            resultCode = 1
            resultDesc = '该用户已存在,注册失败'
            self.server_logger.warning('client[%s:%s] 注册用户[%s]失败:%s'% (self.cli_addr[0], self.cli_addr[1],
                                                                     username, resultDesc))
        #响应客户端注册请求
        self.response(resultCode=resultCode, resultDesc=resultDesc)

    @staticmethod
    def auth(req_dic):
        print(req_dic['user_info'])
        user_info = None
        status_code = 201
        try:
            req_username = req_dic['user_info']['username']
            db_file = os.path.join(settings.db_file, '%s.json'%req_username)
            #验证用户名密码,并制作响应信息字典
            if not os.path.isfile(db_file):
                status_code = 202
            else:
                with open(db_file, 'r') as f:
                    user_info_db = json.load(f)
                if user_info_db['password'] == req_dic['user_info']['password']:
                    status_code = 200
                    user_info = user_info_db
            return status_code, user_info
        #捕获  客户端鉴权请求时发送一个空字典或错误的字典  的异常
        except KeyError:
            return 201, user_info

    def login(self, header_dic, req_dic):
        #鉴权
        status_code, user_info = self.auth(req_dic)
        #响应客户端登陆请求
        self.response(user_info=user_info, resultCode=status_code)

    def query_quota(self, header_dic, req_dic):
        used_quota = None
        total_quota = None
        #鉴权
        status_code, user_info = self.auth(req_dic)
        #查询配额
        if status_code == 200:
            used_quota = commons.getFileSize(user_info['home'])
            total_quota = user_info['quota']
        #响应客户端配额查询请求
        self.response(resultCode=status_code, total_quota=total_quota, used_quota=used_quota)

    @staticmethod
    def parse_file_path(req_path, cur_path):
        req_path = req_path.replace(r'/', '\\')
        req_path = req_path.replace(r'//', r'/',)
        req_path = req_path.replace('\\\\', '\\')
        req_path = req_path.replace('~\\', '', 1)
        req_path = req_path.replace(r'~', '', 1)
        req_paths = re.findall(r'[^\\]+', req_path)
        cur_paths = re.findall(r'[^\\]+', cur_path)
        cur_paths.extend(req_paths)
        cur_paths[0] += '\\'
        while '.' in cur_paths:
            cur_paths.remove('.')
        while '..' in cur_paths:
            for index,item in enumerate(cur_paths):
                if item == '..':
                    cur_paths.pop(index)
                    cur_paths.pop(index-1)
                    break
        return cur_paths

    def cd(self, header_dic, req_dic):
        cmds = header_dic['cmd'].split()
        #鉴权
        status_code, user_info = self.auth(req_dic)
        home = os.path.join(settings.user_home_dir, user_info['username'])
        #先定义响应信息
        returnPath = req_dic['user_info']['cur_path']
        if status_code == 200:
            if len(cmds) != 1:
                #解析cd的真实路径
                cur_path = os.path.join(settings.user_home_dir, req_dic['user_info']['cur_path'])
                cd_path = os.path.join('', *self.parse_file_path(cmds[1], cur_path))
                print('cd解析后的路径:', cd_path)
                if os.path.isdir(cd_path):
                    if home in cd_path:
                        resultCode = 300
                        returnPath = cd_path.replace('%s\\'%settings.user_home_dir, '', 1)
                    else:
                        resultCode = 302
                else:
                    resultCode = 302
            else:
                resultCode = 301
        else:
            resultCode = 201
        #响应客户端的cd命令结果
        print('cd发送给客户端的路径:', returnPath)
        self.response(resultCode=resultCode, returnPath=returnPath)

    def ls(self, header_dic, req_dic):
        cmds = header_dic['cmd'].split()
        #鉴权
        status_code, user_info = self.auth(req_dic)
        home = os.path.join(settings.user_home_dir, user_info['username'])
        #先定义响应信息
        returnFilenames = None
        if status_code == 200:
            if len(cmds) <= 2:
                #解析ls的真实路径
                cur_path = os.path.join(settings.user_home_dir, req_dic['user_info']['cur_path'])
                if len(cmds) == 2:
                    ls_path = os.path.join('', *self.parse_file_path(cmds[1], cur_path))
                else:
                    ls_path = cur_path
                print('ls解析后的路径:', ls_path)
                if os.path.isdir(ls_path):
                    if home in ls_path:
                        returnCode, filenames = commons.getFile(ls_path, home)
                        resultCode = 300
                        returnFilenames = filenames
                    else:
                       resultCode = 302
                else:
                    resultCode = 302
            else:
                resultCode = 301
        else:
            resultCode = 201
        #响应客户端的ls命令结果
        self.response(resultCode=resultCode, returnFilenames=returnFilenames)

    def rm(self, header_dic, req_dic):
        cmds = header_dic['cmd'].split()
        #鉴权
        status_code, user_info = self.auth(req_dic)
        home = os.path.join(settings.user_home_dir, user_info['username'])
        #先定义响应信息
        if status_code == 200:
            if len(cmds) == 2:
                #解析rm的真实路径
                cur_path = os.path.join(settings.user_home_dir, req_dic['user_info']['cur_path'])
                rm_path = os.path.join('', *self.parse_file_path(os.path.dirname(cmds[1]), cur_path))
                rm_file = os.path.join(rm_path, os.path.basename(cmds[1]))
                print('rm解析后的文件或文件夹:', rm_file)
                if os.path.exists(rm_file):
                    if home in rm_file:
                        commons.rmdirs(rm_file)
                        resultCode = 300
                    else:
                       resultCode = 302
                else:
                    resultCode = 302
            else:
                resultCode = 301
        else:
            resultCode = 201
        #响应客户端的rm命令结果
        self.response(resultCode=resultCode)

    def mkdir(self, header_dic, req_dic):
        cmds = header_dic['cmd'].split()
        #鉴权
        status_code, user_info = self.auth(req_dic)
        home = os.path.join(settings.user_home_dir, user_info['username'])
        #先定义响应信息
        if status_code == 200:
            if len(cmds) == 2:
                #解析rm的真实路径
                cur_path = os.path.join(settings.user_home_dir, req_dic['user_info']['cur_path'])
                mkdir_path = os.path.join('', *self.parse_file_path(cmds[1], cur_path))
                print('mkdir解析后的文件夹:', mkdir_path)
                if not os.path.isdir(mkdir_path):
                    if home in mkdir_path:
                        os.makedirs(mkdir_path)
                        resultCode = 300
                    else:
                       resultCode = 302
                else:
                    resultCode = 303
            else:
                resultCode = 301
        else:
            resultCode = 201
        #响应客户端的mkdir命令结果
        self.response(resultCode=resultCode)

    def get(self, header_dic, req_dic):
        """客户端下载文件"""
        cmds = header_dic['cmd'].split()#['get', 'a.txt', 'download']
        get_file = None
        #鉴权
        status_code, user_info = self.auth(req_dic)
        home = os.path.join(settings.user_home_dir, user_info['username'])
        #解析断点续传信息
        position = 0
        if req_dic['resume'] and isinstance(req_dic['position'], int):
            position = req_dic['position']
        #先定义响应信息
        resultCode = 300
        FileSize = None
        FileMd5 = None
        if status_code == 200:
            if 1 < len(cmds) < 4:
                #解析需要get文件的真实路径
                cur_path = os.path.join(settings.user_home_dir, req_dic['user_info']['cur_path'])
                get_file = os.path.join('', *self.parse_file_path(cmds[1], cur_path))
                print('get解析后的路径:', get_file)
                if os.path.isfile(get_file):
                    if home in get_file:
                        FileSize = commons.getFileSize(get_file)
                        if position >= FileSize != 0:
                            resultCode = 304
                        else:
                            resultCode = 300
                            FileSize = FileSize
                            FileMd5 = commons.getFileMd5(get_file)
                    else:
                        resultCode = 302
                else:
                    resultCode = 302
            else:
                resultCode = 301
        else:
            resultCode = 201
        #响应客户端的get命令结果
        self.response(resultCode=resultCode, FileSize=FileSize, FileMd5=FileMd5)
        if resultCode == 300:
            #发送文件数据
            with open(get_file, 'rb') as f:
                f.seek(position)
                for line in f:
                    self.conn.sendall(line)

    def put(self, header_dic, req_dic):
        cmds = header_dic['cmd'].split()#['put', 'download/a.txt', 'video']
        put_file = None
        #鉴权
        status_code, user_info = self.auth(req_dic)
        home = os.path.join(settings.user_home_dir, user_info['username'])
        #查询配额
        used_quota = commons.getFileSize(user_info['home'])
        total_quota = user_info['quota']
        #先定义响应信息
        if status_code == 200:
            if 1 < len(cmds) < 4:
                #解析需要put文件的真实路径
                cur_path = os.path.join(settings.user_home_dir, req_dic['user_info']['cur_path'])
                if len(cmds) == 3:
                    put_file = os.path.join(os.path.join('', *self.parse_file_path(cmds[2], cur_path)), os.path.basename(cmds[1]))
                else:
                    put_file = os.path.join(cur_path, os.path.basename(cmds[1]))
                print('put解析后的文件:', put_file)
                put_path = os.path.dirname(put_file)
                if os.path.isdir(put_path):
                    if home in put_path:
                        if (req_dic['FileSize'] + used_quota) <= total_quota:
                            resultCode = 300
                        else:
                            resultCode =  305
                    else:
                        resultCode = 302
                else:
                    resultCode = 302
            else:
                resultCode = 301
        else:
            resultCode = 201
        #响应客户端的put命令结果
        self.response(resultCode=resultCode)
        if resultCode == 300:
            #接收文件数据,写入文件
            recv_size = 0
            with open(put_file, 'wb') as f:
                while recv_size < req_dic['FileSize']:
                    file_data = self.conn.recv(1024)
                    f.write(file_data)
                    recv_size += len(file_data)
            #校检文件md5一致性
            if commons.getFileMd5(put_file) == req_dic['FileMd5']:
                resultCode = 400
                print('\033[42;1m文件md5校检结果一致\033[0m')
                print('\033[42;1m文件上传成功,大小:%d,文件名:%s\033[0m'% (req_dic['FileSize'], put_file))
            else:
                os.remove(put_file)
                resultCode = 401
                print('\033[31;1m文件md5校检结果不一致\033[0m')
                print('\033[42;1m文件上传失败\033[0m')
            #返回上传文件是否成功响应
            self.response(resultCode=resultCode)
server.py

logger.py

# -*- coding: utf-8 -*-
import logging
from logging import handlers
from conf import settings

def logger(log_type):
    level = settings.log_level
    if level == 'debug':
        level = logging.DEBUG
    elif level == 'info':
        level = logging.INFO
    elif level == 'warning':
        level = logging.WARNING
    elif level == 'error':
        level = logging.ERROR
    else:
        level = logging.CRITICAL
    #1.生成logger对象
    logger = logging.getLogger(log_type)
    logger.setLevel(logging.DEBUG)
    #2.生成handler对象
    fh = handlers.TimedRotatingFileHandler(filename='%s/logs/%s.log'% (settings.base_path, log_type),
                                           when='D', interval=1, backupCount=3)
    # fh = logging.FileHandler(log_file)
    fh.setLevel(level)
    # ch = logging.StreamHandler()
    # ch.setLevel(level)
    #2.1 把handler对象绑定到logger
    if not logger.handlers:
        logger.addHandler(fh)
    # logger.addHandler(ch)
    #3.生成formatter对象
    # f = logging.Formatter(fmt='%(asctime)s %(name)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %I:%M:%S %p')
    f = logging.Formatter(fmt='%(asctime)s %(name)s [%(levelname)s] %(message)s', datefmt=None)
    #3.1 把formatter对象绑定到handler
    fh.setFormatter(f)
    # ch.setFormatter(f)
    return logger
logger.py

commons.py

  1 # -*- coding: utf-8 -*-
  2 
  3 import subprocess, hashlib, struct, json, os
  4 
  5 def save_to_file(info, db_filename):
  6     with open(db_filename, 'w') as f:
  7         json.dump(info, f)
  8 
  9 def getDictBytes(dic):
 10     dic_json = json.dumps(dic)
 11     dic_json_bytes = dic_json.encode('utf-8')
 12     return dic_json_bytes
 13 
 14 def make_header(info_size=0, cmd=None, md5=None):
 15     header_dic = {
 16         'cmd':cmd,
 17         'info_size':info_size,
 18         'md5':md5
 19     }
 20     header_json = json.dumps(header_dic)
 21     header_bytes = header_json.encode('utf-8')
 22     header_size_pack = struct.pack('i', len(header_bytes))
 23     return header_size_pack, header_bytes
 24 
 25 def getFileSize(path, size=0):
 26     """获取路径下的总大小(字节)
 27     :param path: 文件路径
 28     :param size: 起始大小(字节)
 29     :return:总大小(字节)
 30     """
 31     if os.path.exists(path):
 32         size = size
 33         try:
 34             if os.path.isdir(path):
 35                 for item in os.listdir(path):
 36                     items_path = os.path.join(path, item)
 37                     if os.path.isdir(items_path):
 38                         size = getFileSize(items_path, size)
 39                     else:
 40                         size += os.path.getsize(items_path)
 41             else:
 42                 size = os.path.getsize(path)
 43         except PermissionError:pass
 44     else:
 45         return 0
 46     return size
 47 
 48 def getFile(path, home):
 49     """展示路径下的文件信息
 50     :param path:文件路径
 51     """
 52     if os.path.exists(path):
 53         res = 'total %d\n'%getFileSize(path)
 54         returnCode = 0
 55         if os.path.isdir(path):
 56             try:
 57                 for item in os.listdir(path):
 58                     items_path = os.path.join(path, item)
 59                     size = getFileSize(items_path)
 60                     if os.path.isdir(items_path):
 61                         res += 'd  %s  %s\n'% (item, size)
 62                     else:res += 'f  %s  %s\n'% (item, size)
 63             except PermissionError:pass
 64         else:
 65             res += 'f  %s  %s\n'% (path.replace('%s\\'%home, '', 1), getFileSize(path))
 66     else:
 67         returnCode = 1
 68         res = 'ls: error:没有那个文件或目录'
 69     return returnCode, res
 70 
 71 def getStrsMd5(*strs):
 72     """该函数用于获取字符串的md5值
 73     :param strs:命令结果
 74     :return:将摘要值返回为十六进制数字的字符串
 75     """
 76     md5 = hashlib.md5()
 77     for str in strs:
 78         md5.update(str)
 79     return md5.hexdigest()
 80 
 81 def getFileMd5(filename):
 82     """该函数用于获取文件的md5值
 83     :param filename:'文件名'
 84     :return:将摘要值返回为十六进制数字的字符串
 85     """
 86     if not os.path.isfile(filename):
 87         return
 88     md5 = hashlib.md5()
 89     with open(filename, 'rb') as f:
 90         for line in f:
 91             md5.update(line)
 92     return md5.hexdigest()
 93 
 94 def rmdirs(path):
 95     if os.path.exists(path):
 96         if os.path.isdir(path):
 97             sub_items = os.listdir(path)
 98             for sub_item in sub_items:
 99                 full_sub_item = os.path.join(path, sub_item)
100                 rmdirs(full_sub_item)
101             else:
102                 os.rmdir(path)
103         else:
104             os.remove(path)
105     else:
106         return 1
107     return 0
108 
109 # def exec_cmd(command):
110 #     """该函数用于执行系统命令,并返回结果
111 #     :param command:系统命令,str类型
112 #     :return:返回tuple(b'正常命令执行输出结果', b'错误命令执行输出结果')
113 #     """
114 #     res = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
115 #                            stderr=subprocess.PIPE)
116 #     res_out = res.stdout.read()
117 #     res_err = res.stderr.read()
118 #     return res_out, res_err
119 
120 
121 
122 
123 
124 
125 
126 # def make_file_header(getFile, filePermission):
127 #     """该函数用于制作文件固定长度报文头
128 #     :param getFile:客户端下载的文件
129 #     :param filePermission:客户端下载的文件的权限
130 #     :return:返回tuple(报文头长度包对象(struct_pack 'i'), 报文头内容bytes类型)
131 #     """
132 #     if filePermission:
133 #         md5 = getFileMd5(getFile)
134 #         if md5:
135 #             resultCode = 0
136 #             file_size = os.path.getsize(getFile)
137 #             failReason = None
138 #         else:
139 #             resultCode = 1
140 #             file_size = None
141 #             failReason = 'The file could not be found'
142 #     else:
143 #         md5= None
144 #         resultCode = 1
145 #         file_size = None
146 #         failReason = 'Permission Denied!'
147 #     header_dic = {
148 #         'filename':getFile,
149 #         'md5':md5,
150 #         'file_size':file_size,
151 #         'resultCode':resultCode,
152 #         'failReason':failReason
153 #     }
154 #     header_json = json.dumps(header_dic)
155 #     header_bytes = header_json.encode('utf-8')
156 #     header_size_pack = struct.pack('i', len(header_bytes))
157 #     return resultCode, header_size_pack, header_bytes
158 
159 # def make_str_header(*strs, cmd=None, resultCode=0, failReason=None):
160 #     """该函数用于制作字符串固定长度报文头
161 #     :param cmd:客户端请求的命令
162 #     :param resultCode:返回结果
163 #     :param failReason:失败原因说明
164 #     :param strs:命令结果
165 #     :return:返回tuple(报文头长度包对象(struct_pack 'i'), 报文头内容bytes类型)
166 #     """
167 #     md5 = getStrsMd5(*strs)
168 #     total_size = 0
169 #     for str in strs:
170 #         total_size += len(str)
171 #     header_dic = {
172 #         'cmd':cmd,
173 #         'md5':md5,
174 #         'total_size':total_size,
175 #         'resultCode':resultCode,
176 #         'failReason':failReason
177 #     }
178 #     header_json = json.dumps(header_dic)
179 #     header_bytes = header_json.encode('utf-8')
180 #     header_size_pack = struct.pack('i', len(header_bytes))
181 #     return header_size_pack, header_bytes
commons.py

settings.py

 1 # Author:ton
 2 # -*- coding: utf-8 -*-
 3 import sys, os
 4 
 5 base_path = sys.path[0]
 6 #服务端配置
 7 server_bind_ip = '127.0.0.1'
 8 server_bind_port = 8080
 9 server_listen = 5
10 #日志配置
11 log_level = 'debug'
12 #数据库文件配置
13 db_file = os.path.join(base_path, 'db')
14 #用户配置
15 user_home_dir = os.path.join(base_path, 'home')
16 user_quota = 10#默认配额,单位:M
settings.py

客户端:

client.py

  1 # -*- coding: utf-8 -*-
  2 import socket, struct, json, os, time
  3 from lib import commons
  4 from conf import settings
  5 from core import logger
  6 from core import progressBar
  7 
  8 class Client(object):
  9 
 10     STATUS_CODE = {
 11         200 : 'Passed authentication!',
 12         201 : 'Wrong username or password!',
 13         202 : 'Username does not exist!',
 14         300 : 'cmd successful , the target path be returned in returnPath',
 15         301 : 'cmd format error!',
 16         302 : 'The path or file could not be found!',
 17         303 : 'The dir is exist',
 18         304 : 'The file has been downloaded or the size of the file is exceptions',
 19         305 : 'Free space is not enough',
 20         401 : 'File MD5 inspection failed',
 21         400 : 'File MD5 inspection success',
 22     }
 23 
 24     def __init__(self):
 25         try:
 26             self.init_dir()
 27             self.client_logger = logger.logger('client')
 28             self.connect()
 29             self.user_info = {}
 30             self.run()
 31             self.client.close()
 32         except (ConnectionResetError, ConnectionRefusedError, ConnectionAbortedError):
 33             self.client_logger.info('Cannot find server, client will be closed...')
 34             print('\033[31;1mCannot find server, client will be closed...\033[0m')
 35 
 36     @staticmethod
 37     def init_dir():
 38         if not os.path.exists(os.path.join(settings.base_path, 'logs')):
 39             os.makedirs(os.path.join(settings.base_path, 'logs'))
 40         if not os.path.exists(os.path.join(settings.base_path, 'downloadList')):
 41             os.makedirs(os.path.join(settings.base_path, 'downloadList'))
 42 
 43     def connect(self):
 44         self.client = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
 45         self.client.connect((settings.client_connect_ip,settings.client_connect_port))
 46         self.client_logger.info('Has been successfully connected to the server[%s:%s]'%
 47                                 (settings.client_connect_ip, settings.client_connect_port))
 48         print('Has been successfully connected to the server[%s:%s]'%
 49               (settings.client_connect_ip, settings.client_connect_port))
 50 
 51     def run(self):
 52         while True:
 53             if self.user_info:
 54                 self.interactive()
 55             print('\n欢迎来到MyFTP程序\n'
 56             '1、注册\n'
 57             '2、登陆\n'
 58             '3、退出')
 59             choice = input('>>').strip()
 60             if choice == '1':
 61                 self.register('register')
 62             elif choice == '2':
 63                 self.login('login')
 64             elif choice == '3':
 65                 exit('Bye')
 66 
 67     def unpack_header(self):
 68         pack_obj = self.client.recv(4)
 69         header_size = struct.unpack('i', pack_obj)[0]
 70         header_bytes = self.client.recv(header_size)
 71         header_json = header_bytes.decode('utf-8')
 72         header_dic = json.loads(header_json)
 73         return header_dic
 74 
 75     def unpack_info(self, info_size):
 76         recv_size = 0
 77         info_bytes = b''
 78         while recv_size < info_size:
 79             res = self.client.recv(1024)
 80             info_bytes += res
 81             recv_size += len(res)
 82         info_json = info_bytes.decode('utf-8')
 83         info_dic = json.loads(info_json)#{'username':ton, 'password':123}
 84         info_md5 = commons.getStrsMd5(info_bytes)
 85         return info_dic, info_md5
 86 
 87     def recv_response(self):
 88         header_dic = self.unpack_header()
 89         rsp_dic, info_md5 = self.unpack_info(header_dic['info_size'])
 90         if header_dic.get('md5'):
 91             #校检响应内容md5一致性
 92             if info_md5 == header_dic['md5']:
 93                 print('\033[42;1m响应内容md5校检结果一致\033[0m')
 94             else:
 95                 print('\033[31;1m响应内容md5校检结果不一致\033[0m')
 96         return rsp_dic
 97 
 98     def request(self, cmd, **kwargs):
 99         #1、向服务端发送命令、鉴权请求'cmd',cmd='login'、'cd'、'get 1.mp3'
100         req_info = kwargs
101         req_info_bytes = json.dumps(req_info).encode('utf-8')
102         md5 = commons.getStrsMd5(req_info_bytes)
103         header_size_pack, header_bytes = commons.make_header(info_size=len(req_info_bytes), cmd=cmd, md5=md5)
104         self.client.sendall(header_size_pack)
105         self.client.sendall(header_bytes)
106         #2、发送user_info用于服务端鉴权
107         self.client.sendall(req_info_bytes)
108 
109     def register(self, cmd):
110         username = input('username:').strip()
111         password = input('password:').strip()
112         password = commons.getStrsMd5(password.encode('utf-8'))#密码md5值
113         user_info = {'username':username, 'password':password}
114         #1、向服务端发送注册请求'register'及注册信息
115         self.request(cmd, user_info=user_info)
116         self.client_logger.info('向服务端发送注册请求,用户名:%s,密码:%s'% (username, password))
117         #2、接收服务端响应结果
118         rsp_dic = self.recv_response()
119         if rsp_dic['resultCode'] == 1:
120             self.client_logger.info('%s'%rsp_dic['resultDesc'])
121             print('\033[31;1m%s\033[0m'%rsp_dic['resultDesc'])
122             return
123         self.client_logger.info('用户[%s]注册成功'%username)
124         print('\033[42;1m用户[%s]注册成功\033[0m'%username)
125 
126     def login(self, cmd):
127         username = input('username:').strip()
128         password = input('password:').strip()
129         password = commons.getStrsMd5(password.encode('utf-8'))#密码md5值
130         user_info = {'username':username, 'password':password}
131         #1、向服务端发送登陆请求'login'
132         self.request(cmd, user_info=user_info)
133         self.client_logger.info('向服务端发送登陆请求,用户名:%s,密码:%s'% (username, password))
134         #2、接收服务端响应结果
135         rsp_dic = self.recv_response()
136         if rsp_dic['resultCode'] != 200:
137             print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
138             return
139         print(rsp_dic)
140         self.user_info = rsp_dic['user_info']
141         self.user_info['password'] = password
142         self.user_info['cur_path'] = username
143         print('\033[42;1m用户[%s]登陆成功\033[0m'%username)
144 
145     def interactive(self):
146         while True:#通讯循环
147             if not self.user_info:break
148             self.query_quota()
149             print('%dM/%dM'% (self.user_info['used_quota']/(1024*1024), self.user_info['total_quota']/(1024*1024)))
150             cmd = input('%s>'%self.user_info['cur_path']).strip()#'get a.txt'、'login'
151             if not cmd:continue
152             if cmd == 'exit':
153                 self.user_info = {}
154                 break
155             cmds = cmd.split()#['get', 'a.txt']
156             if hasattr(self, cmds[0]):
157                 self.client_logger.info('calling method:[%s], cmd:[%s]}'% (cmds[0], cmd))
158                 getattr(self, cmds[0])(cmd)
159             else:
160                 print('语法错误,仅支持ls,cd,get(断点续传),put,rm,mkdir,listDownload(下载列表)')
161 
162     def query_quota(self):
163         #1、向服务端发送查询配额空间的请求'query_quota',并鉴权
164         self.request('query_quota', user_info=self.user_info)
165         #2、接收服务端配额查询结果
166         rsp_dic = self.recv_response()
167         if rsp_dic['resultCode'] == 200:
168             self.user_info['total_quota'] = rsp_dic['total_quota']
169             self.user_info['used_quota'] = rsp_dic['used_quota']
170         else:
171             print('\033[31;1m%s,pls login again!\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
172             self.user_info = {}
173 
174     def cd(self, cmd):
175         #判断语法
176         cmds = cmd.split()
177         if len(cmds) != 2:
178             print("用法错误,example:'cd music'、'cd ~'、'cd .'、'cd ..'")
179             return
180         #1、向服务端发送cd命令'cd music'、'cd ~'、'cd .'、'cd ..',并鉴权
181         self.request(cmd, user_info=self.user_info)
182         #2、接收服务端cd命令响应结果
183         rsp_dic = self.recv_response()
184         if rsp_dic['resultCode'] != 300:
185             print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
186             return
187         self.user_info['cur_path'] = rsp_dic['returnPath']
188 
189     def ls(self, cmd):
190         #判断语法
191         cmds = cmd.split()
192         if len(cmds) > 2:
193             print("用法错误,example:'ls music'、'ls ~'、'ls ~/music'")
194             return
195         #1、向服务端发送ls命令'ls music'、'ls ~'、'ls ~/music',并鉴权
196         self.request(cmd, user_info=self.user_info)
197         #2、接收服务端ls命令响应结果
198         rsp_dic = self.recv_response()
199         if rsp_dic['resultCode'] != 300:
200             print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
201             return
202         print(rsp_dic['returnFilenames'])
203 
204     def rm(self, cmd):
205         #判断语法
206         cmds = cmd.split()
207         if len(cmds) != 2:
208             print("用法错误,example:'rm music'、'rm 1.mp3'")
209             return
210         #1、向服务端发送rm命令'rm music'、'rm 1.mp3',并鉴权
211         self.request(cmd, user_info=self.user_info)
212         #2、接收服务端rm命令响应结果
213         rsp_dic = self.recv_response()
214         if rsp_dic['resultCode'] != 300:
215             print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
216         else:
217             print('删除成功')
218 
219     def mkdir(self, cmd):
220         #判断语法
221         cmds = cmd.split()
222         if len(cmds) != 2:
223             print("用法错误,example:'mkdir music'、'mkdir music/1/11'")
224             return
225         #1、向服务端发送mkdir命令'mkdir music'、'mkdir music/1/11',并鉴权
226         self.request(cmd, user_info=self.user_info)
227         #2、接收服务端mkdir命令响应结果
228         rsp_dic = self.recv_response()
229         if rsp_dic['resultCode'] != 300:
230             print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
231         else:
232             print('创建目录成功')
233 
234     def listDownload(self, cmd):
235         download_dic = self.load_downList()
236         if download_dic:
237             for key in download_dic:
238                 print('ori_file:%s  local_file:%s  Status:%s'% (key, download_dic[key]['local_file'], download_dic[key]['status']))
239             choice = input('请输入需要继续下载源文件名:').strip()
240             if download_dic.get(choice):
241                 if download_dic[choice]['status'] != 'finish':
242                     self.get(cmd='get %s %s'% (choice, os.path.dirname(download_dic[choice]['local_file'])), resume_flag=True)
243                 else:
244                     print('\033[31;1m文件已经下载完成\033[0m')
245             else:
246                 print('\033[31;1m请输入正确源文件名\033[0m')
247         else:
248             print('\033[31;1m下载列表为空,请先进行get下载\033[0m')
249 
250     def load_downList(self):
251         dl_list_file = '%s/downloadList/%s.json'% (settings.base_path, self.user_info['username'])
252         if os.path.isfile(dl_list_file):
253             with open(dl_list_file, 'r')  as f:
254                 download_dic = json.load(f)
255             return download_dic
256         else:
257             return
258 
259     def progressBar(self, total, cur=0):
260         if total == 0:
261             total = 1
262             cur = 1
263         last_percent = int(cur / total * 100)
264         while True:
265             cur = yield
266             cur_percent = int(cur / total * 100)
267             if cur_percent > last_percent:
268                 print('{0:3}%: '.format(cur_percent) + '#'*int(cur_percent/4), end='\r', flush=True)
269                 if cur == total:
270                     print('\n')
271                 last_percent = cur_percent
272 
273 
274     def get(self, cmd, resume_flag=False):
275         """下载文件"""
276         #判断语法
277         cmds = cmd.split()
278         if not 1 < len(cmds) < 4:
279             print("\033[31;1m用法错误,example:'get 1.mp3 download/'\033[0m")
280             return
281         #解析写入文件路径
282         recv_filename = os.path.basename(cmds[1])
283         if len(cmds) == 3:
284             recv_file_path = cmds[2]
285         else:
286             recv_file_path = settings.base_path
287         recv_filename = os.path.join(recv_file_path, recv_filename)
288         if not os.path.isdir(recv_file_path):
289             print('\033[31;1m本地路径[%s]不存在\033[0m'%recv_file_path)
290             return
291         #先定义请求信息
292         resume = False,
293         position = 0
294         #选择是否断点续传
295         if resume_flag:
296             resume = True
297             position = commons.getFileSize(recv_filename)
298         else:
299             if os.path.isfile(recv_filename):
300                 os.remove(recv_filename)
301         #1、向服务端发送ftp命令'get 1.mp3 download/',并鉴权
302         self.request(cmd, user_info=self.user_info, resume=resume, position=position)
303         #2、接收服务端get命令响应结果
304         rsp_dic = self.recv_response()
305         if rsp_dic['resultCode'] != 300:
306             print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
307             return
308         #新增功能,先添加下载列表,并保存到文件:
309         download_dic = self.load_downList()
310         if download_dic:
311             download_dic[cmds[1]] = {'local_file':recv_filename, 'status':'loading'}
312         else:
313             download_dic = dict()
314             download_dic[cmds[1]] = {'local_file':recv_filename, 'status':'loading'}
315         with open('%s/downloadList/%s.json'% (settings.base_path, self.user_info['username']), 'w')  as f:
316             json.dump(download_dic, f)
317         #3、接收文件数据,写入文件
318         recv_size = position
319         with open(recv_filename, 'a+b') as f:
320             progress_generator = self.progressBar(total=rsp_dic['FileSize'], cur=recv_size)
321             progress_generator.__next__()
322             while recv_size < rsp_dic['FileSize']:
323                 file_data = self.client.recv(1024)
324                 f.write(file_data)
325                 recv_size += len(file_data)
326                 progress_generator.send(recv_size)
327         #校检文件md5一致性
328         if commons.getFileMd5(recv_filename) == rsp_dic['FileMd5']:
329             download_dic[cmds[1]]['status'] = 'finish'
330             with open('%s/downloadList/%s.json'% (settings.base_path, self.user_info['username']), 'w')  as f:
331                 json.dump(download_dic, f)
332             print('\033[42;1m文件md5校检结果一致\033[0m')
333             print('\033[42;1m文件下载成功,大小:%d,文件名:%s\033[0m'% (rsp_dic['FileSize'], recv_filename))
334         else:
335             os.remove(recv_filename)
336             download_dic[cmds[1]]['status'] = 'failed'
337             with open('%s/downloadList/%s.json'% (settings.base_path, self.user_info['username']), 'w')  as f:
338                 json.dump(download_dic, f)
339             print('\033[31;1m文件md5校检结果不一致\033[0m')
340             print('\033[42;1m文件下载失败\033[0m')
341 
342     def put(self, cmd):
343         cmds = cmd.split()
344         #判断语法
345         if not 1 < len(cmds) < 4:
346             print("用法错误,example:'put download/1.mp3 video'")
347             return
348         #判断文件是否存在
349         if not os.path.isfile(cmds[1]):
350             print('\033[31;1mThe file could not be found\033[0m')
351             return
352         #先定义请求信息
353         FileSize = commons.getFileSize(cmds[1])
354         FileMd5 = commons.getFileMd5(cmds[1])
355         #1、向服务端发送ftp命令'put download/2.mp3 video',并鉴权
356         self.request(cmd, user_info=self.user_info, FileSize=FileSize, FileMd5=FileMd5)
357         #2、接收服务端put命令响应结果
358         rsp_dic = self.recv_response()
359         if rsp_dic['resultCode'] != 300:
360             print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
361             return
362         #3、发送文件数据
363         progress_generator = self.progressBar(total=FileSize)
364         progress_generator.__next__()
365         send_size = 0
366         with open(cmds[1], 'rb') as f:
367             for line in f:
368                 self.client.sendall(line)
369                 send_size += len(line)
370                 progress_generator.send(send_size)
371         #4、等待服务端响应成功接收
372         rsp_dic = self.recv_response()
373         if rsp_dic['resultCode'] == 400:
374             print('\033[42;1m文件上传成功,文件名:%s\033[0m'%os.path.basename(cmds[1]))
375         else:
376             print('文件上传失败[%s]'%self.STATUS_CODE.get(rsp_dic['resultCode']))
377 
378     def pwd(self, cmd):
379         print(self.user_info['cur_path'])
client.py

progressBar.py

 1 # -*- coding: utf-8 -*-
 2 
 3 import sys
 4 
 5 class ProgressBar:
 6     def __init__(self, count = 0, total = 0, width = 50):
 7         self.count = count
 8         self.total = total
 9         self.width = width
10     def move(self):
11         self.count += 1
12     def over(self):
13         self.count = self.total
14     def log(self, s):
15         sys.stdout.write('%s'%s)
16         progress = int(self.width * self.count / self.total)
17         sys.stdout.write('{0:3}%/{1:1}%: '.format(self.count, self.total))
18         sys.stdout.write('#' * progress + '-' * (self.width - progress) + '\r')
19         if progress == self.
20             sys.stdout.write('\n')
21         sys.stdout.flush()
progressBar.py

logger.py

 1 # -*- coding: utf-8 -*-
 2 import logging
 3 from logging import handlers
 4 from conf import settings
 5 
 6 def logger(log_type):
 7     level = settings.log_level
 8     if level == 'debug':
 9         level = logging.DEBUG
10     elif level == 'info':
11         level = logging.INFO
12     elif level == 'warning':
13         level = logging.WARNING
14     elif level == 'error':
15         level = logging.ERROR
16     else:
17         level = logging.CRITICAL
18     #1.生成logger对象
19     logger = logging.getLogger(log_type)
20     logger.setLevel(logging.DEBUG)
21     #2.生成handler对象
22     fh = handlers.TimedRotatingFileHandler(filename='%s/logs/%s.log'% (settings.base_path, log_type),
23                                            when='D', interval=1, backupCount=3)
24     # fh = logging.FileHandler(log_file)
25     fh.setLevel(level)
26     # ch = logging.StreamHandler()
27     # ch.setLevel(level)
28     #2.1 把handler对象绑定到logger
29     logger.addHandler(fh)
30     # logger.addHandler(ch)
31     #3.生成formatter对象
32     # f = logging.Formatter(fmt='%(asctime)s %(name)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %I:%M:%S %p')
33     f = logging.Formatter(fmt='%(asctime)s %(name)s [%(levelname)s] %(message)s', datefmt=None)
34     #3.1 把formatter对象绑定到handler
35     fh.setFormatter(f)
36     # ch.setFormatter(f)
37     return logger
logger.py

commons.py

  1 # -*- coding: utf-8 -*-
  2 
  3 import subprocess, hashlib, struct, json, os, time
  4 
  5 def getStrsMd5(*strs):
  6     """该函数用于获取字符串的md5值
  7     :param strs:命令结果
  8     :return:将摘要值返回为十六进制数字的字符串
  9     """
 10     md5 = hashlib.md5()
 11     for str in strs:
 12         md5.update(str)
 13     return md5.hexdigest()
 14 
 15 def getDictBytes(dic):
 16     dic_json = json.dumps(dic)
 17     dic_json_bytes = dic_json.encode('utf-8')
 18     return dic_json_bytes
 19 
 20 def make_header(info_size=0, cmd=None, md5=None):
 21     header_dic = {
 22         'cmd':cmd,
 23         'info_size':info_size,
 24         'md5':md5
 25     }
 26     header_json = json.dumps(header_dic)
 27     header_bytes = header_json.encode('utf-8')
 28     header_size_pack = struct.pack('i', len(header_bytes))
 29     return header_size_pack, header_bytes
 30 
 31 def getFileMd5(filename):
 32     """该函数用于获取字符串的md5值
 33     :param filename:'文件名'
 34     :return:将摘要值返回为十六进制数字的字符串
 35     """
 36     if not os.path.isfile(filename):
 37         return
 38     md5 = hashlib.md5()
 39     with open(filename, 'rb') as f:
 40         for line in f:
 41             md5.update(line)
 42     return md5.hexdigest()
 43 
 44 def getFileSize(path, size=0):
 45     """获取路径下的总大小(字节)
 46     :param path: 文件路径
 47     :param size: 起始大小(字节)
 48     :return:总大小(字节)
 49     """
 50     if os.path.exists(path):
 51         size = size
 52         try:
 53             if os.path.isdir(path):
 54                 for item in os.listdir(path):
 55                     items_path = os.path.join(path, item)
 56                     if os.path.isdir(items_path):
 57                         size = getFileSize(items_path, size)
 58                     else:
 59                         size += os.path.getsize(items_path)
 60             else:
 61                 size = os.path.getsize(path)
 62         except PermissionError:pass
 63     else:
 64         return 0
 65     return size
 66 
 67 
 68 # def make_file_header(filename):
 69 #     """该函数用于制作自定义应用层协议固定长度包头
 70 #     :param filename:'文件名'
 71 #     :return:返回tuple(报文头长度包对象(struct_pack 'i'), 报文头内容bytes类型)
 72 #     """
 73 #     md5 = getFileMd5(filename)
 74 #     if md5:
 75 #         resultCode = 0
 76 #         file_size = os.path.getsize(filename)
 77 #         failReason = None
 78 #     else:
 79 #         resultCode = 1
 80 #         file_size = None
 81 #         failReason = 'The file could not be found'
 82 #     header_dic = {
 83 #         'filename':filename,
 84 #         'md5':md5,
 85 #         'file_size':file_size,
 86 #         'resultCode':resultCode,
 87 #         'failReason':failReason
 88 #     }
 89 #     header_json = json.dumps(header_dic)
 90 #     header_bytes = header_json.encode('utf-8')
 91 #     header_size_pack = struct.pack('i', len(header_bytes))
 92 #     return header_size_pack, header_bytes
 93 
 94 
 95 
 96 # def exec_cmd(command):
 97 #     """该函数用于执行系统命令,并返回结果
 98 #     :param command:系统命令,str类型
 99 #     :return:返回tuple(b'正常命令执行输出结果', b'错误命令执行输出结果')
100 #     """
101 #     res = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
102 #                            stderr=subprocess.PIPE)
103 #     res_out = res.stdout.read()
104 #     res_err = res.stderr.read()
105 #     return res_out, res_err
commons.py

settings.py

 1 # Author:ton
 2 # -*- coding: utf-8 -*-
 3 import sys
 4 base_path = sys.path[0]
 5 # server_bind_ip = '127.0.0.1'
 6 # server_bind_port = 8080
 7 # server_listen = 5
 8 client_connect_ip = '127.0.0.1'
 9 client_connect_port = 8080
10 
11 log_level = 'debug'
settings.py

三、使用截图

 

PS:支持多用户同时在线上传、下载文件,支持文件md5检验,支持断点续传

 

原文地址:https://www.cnblogs.com/linzetong/p/8290378.html