module04-3-select版FTP

需求


1. 使用SELECT或SELECTORS模块实现并发简单版FTP

2. 允许多用户并发上传下载文件

目录结构


ftp_server
    ├ bin   # 执行文件目录
    |   └ ftp_server.py     # 执行程序 
    ├ conf  # 配置文件目录
    |   ├ setting           # 配置文件。目前主要保存服务端sock和数据存储空间地址
    |   └ init_setting.py   # 配置文件格式化程序
    ├ core  # 程序核心代码位置
    |   └ main.py           # 主逻辑交互程序
    └ storage   # 服务端的数据存储空间       

ftp_client
    ├ bin   # 执行文件目录
    |   └ ftp_client.py     # 执行程序 
    ├ conf  # 配置文件目录
    |   ├ setting           # 配置文件。目前主要保存服务端sock和客户端文件下载目录地址
    |   └ init_setting.py   # 配置文件格式化程序
    ├ core  # 程序核心代码位置
    |   └ main.py           # 主逻辑交互程序
    └ download   # 客户端文件下载目录   

 代码


ftp_server
1 import sys,os
2 
3 basepath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
4 sys.path.insert(0,basepath)
5 
6 from core import main
7 main.main()
bin/ftp_server.py
 1 import configparser
 2 
 3 c = configparser.ConfigParser()
 4 
 5 c['server_info'] = {
 6     'server_address' : ('0.0.0.0',12345),
 7     'storage_dir' : 'storage'
 8 }
 9 
10 with open('setting','w',encoding='utf-8') as f:
11     c.write(f)
conf/init_setting.py
  1 #! /usr/bin/env python3
  2 # -*- utf-8 -*-
  3 # Author:Jailly
  4 
  5 import configparser,os,selectors,socket,json
  6 
  7 basepath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  8 setting_path = os.path.join(os.path.join(basepath, 'conf'), 'setting')
  9 
 10 
 11 def get_setting(setting_path):
 12     cf = configparser.ConfigParser()
 13     cf.read(setting_path)
 14     setting_dict = {}
 15     for section in cf.sections():
 16         setting_dict[section] = {}
 17         for key,value in cf.items(section):
 18             setting_dict[section][key] = value
 19 
 20     return setting_dict
 21 
 22 
 23 setting_dict = get_setting(setting_path)
 24 server_address = eval(setting_dict['server_info']['server_address'])
 25 storage_dir = os.path.join(basepath, setting_dict['server_info']['storage_dir'])
 26 
 27 
 28 class FTPServer(object):
 29     '''selectors版的ftp服务器类,封装ftp服务器的相关处理方法'''
 30 
 31     def __init__(self,sock):
 32         self.sock = sock
 33         self.server = socket.socket()
 34         self.server.setblocking(False)
 35         self.selector = selectors.DefaultSelector()
 36         self.file_path_dict = {}
 37         self.file_size_dict = {}
 38         self.get_fd_dict = {}
 39         self.put_fd_dict = {}
 40         self.put_file_size_dict = {}
 41 
 42     def startup(self):
 43         self.server.bind(self.sock)
 44         self.server.listen()
 45         self.selector.register(self.server,selectors.EVENT_READ,self.accept)
 46 
 47 
 48     def accept(self,server):
 49         conn,addr = server.accept()
 50         conn.setblocking(False)
 51         self.selector.register(conn,selectors.EVENT_WRITE,self.write)
 52         # self.selector.register(conn,selectors.EVENT_READ,self.read)
 53 
 54 
 55     def write(self,conn):
 56         # 发送存储空间内的文件列表
 57         file_list = os.listdir(storage_dir)
 58         conn.send(json.dumps(file_list).encode())
 59         self.selector.unregister(conn)
 60         self.selector.register(conn,selectors.EVENT_READ,self.read)
 61 
 62 
 63     def read(self,conn):
 64 
 65         cmd = conn.recv(1024).decode('utf-8')
 66 
 67         if len(cmd.split()) == 2:
 68             action = cmd.split()[0]
 69             file_path = os.path.join(storage_dir, cmd.split()[1])
 70 
 71             if action == 'put':
 72                 conn.send(b'0')  # 配合客户端完成阻塞,以防止客户端粘包(c:1)
 73 
 74                 file_path = os.path.join(storage_dir,os.path.basename(file_path))
 75                 file_path = self.rename(file_path) if os.path.isfile(file_path) else file_path
 76                 f = open(file_path, 'wb')
 77                 self.put_fd_dict[conn] = f
 78                 self.put_file_size_dict[conn] = 0
 79 
 80                 self.selector.unregister(conn)
 81                 self.selector.register(conn,selectors.EVENT_READ,self.put1)
 82 
 83             elif action == 'get':
 84                 if os.path.isfile(file_path):
 85                     conn.send(b'0')  # 发送 0 ,告知客户端 要下载的文件存在,开始文件传输
 86 
 87                     self.file_path_dict[conn] = file_path
 88 
 89                     self.selector.unregister(conn)
 90                     self.selector.register(conn,selectors.EVENT_READ,self.get1)
 91                 else:
 92                     print('prepare to tell client the file that the user wanna to get does not exist')
 93                     conn.send(b'1')  # # 发送 1 告知 客户端 文件不存在
 94                     print('has informed the client ')
 95                     self.selector.unregister(conn)
 96                     self.selector.register(conn,selectors.EVENT_WRITE,self.write)
 97 
 98         else:
 99             if cmd == 'exit' or cmd == '':
100                 self.selector.unregister(conn)
101                 conn.close()
102 
103             elif cmd == 'skip':
104                 self.selector.unregister(conn)
105                 self.selector.register(conn,selectors.EVENT_WRITE,self.write)
106 
107 
108     def get1(self,conn):
109         conn.recv(1024)  # 阻塞,以防止服务端粘包(s:1)
110         print('prepare to send file_size')
111         # 发送 下载文件的大小
112         file_path = self.file_path_dict[conn]
113         file_size = os.stat(file_path).st_size
114         self.file_size_dict[conn] = file_size
115 
116         conn.send(str(file_size).encode())
117         print('send file_size ',file_size)
118 
119         self.selector.unregister(conn)
120         self.selector.register(conn,selectors.EVENT_READ,self.get2)
121 
122 
123     def get2(self,conn):
124         conn.recv(1024)  # 阻塞,以防止服务端粘包(s:2)
125         print('send file start!')
126         file_path = self.file_path_dict[conn]
127 
128         # with open(file_path,'rb') as f:
129         #     for line in f:
130         #         # print(line)
131         #         conn.send(line)
132         #
133         # print('send file end!')
134 
135         # get_progress = 0
136         f = open(file_path,'rb')
137         self.get_fd_dict[conn] = f
138 
139         self.selector.unregister(conn)
140         self.selector.register(conn, selectors.EVENT_WRITE, self.get3)
141 
142         conn.send(f.readline())
143 
144 
145     def get3(self,conn):
146         f = self.get_fd_dict[conn]
147 
148         conn.send(f.readline())
149 
150         get_progress = f.tell()
151         file_size = self.file_size_dict[conn]
152 
153         if get_progress == file_size:
154             del self.get_fd_dict[conn]
155             del self.file_size_dict[conn]
156             del self.file_path_dict[conn]
157             f.close()
158             self.write(conn)
159 
160 
161     def put1(self,conn):
162         file_size = int(conn.recv(1024).decode('utf-8'))
163         self.file_size_dict[conn] = file_size
164         conn.send(b'0')  # 配合客户端完成阻塞,以防止客户端粘包(c:2)
165 
166         self.selector.unregister(conn)
167         self.selector.register(conn,selectors.EVENT_READ,self.put2)
168 
169 
170     def put2(self,conn):
171 
172         f = self.put_fd_dict[conn]
173         accepting_data = conn.recv(1024)
174         f.write(accepting_data)
175 
176         self.put_file_size_dict[conn] += len(accepting_data)
177 
178         if self.put_file_size_dict[conn] == self.file_size_dict[conn]:
179             del self.put_file_size_dict[conn]
180             del self.file_size_dict[conn]
181             del self.put_fd_dict[conn]
182 
183             f.close()
184             self.write(conn)
185 
186 
187     def rename(self,file_path,times = 1):
188         f_name,f_extension = os.path.splitext(file_path)
189         f_name += '(%s)'%str(times)
190         new_file_path = f_name + f_extension
191 
192         if os.path.isfile(new_file_path):
193             times += 1
194             return self.rename(file_path,times)
195         else:
196             return new_file_path
197 
198     def monitor(self):
199         while 1:
200             ready_list = self.selector.select()
201             for key,event in ready_list:
202                 key.data(key.fileobj)
203 
204 
205 def main():
206 
207     fs = FTPServer(server_address)
208     fs.startup()
209     fs.monitor()
210 
211 if __name__ == '__main__':
212     main()
core/main.py
ftp_client
1 import sys,os
2 
3 basepath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
4 sys.path.insert(0,basepath)
5 
6 from core import main
7 main.main()
bin/ftp_client.py
1 cf = configparser.ConfigParser()
2 
3 cf['client_info'] = {
4     'server_address':('127.0.0.1',12345),
5     'download_dir':'download'
6 }
7 
8 with open('setting','w',encoding='utf-8') as f:
9     cf.write(f)
conf/init_setting.py
  1 #! /usr/bin/env python3
  2 # -*- coding:utf-8 -*-
  3 # Author:Jailly
  4 
  5 import socket,os,configparser,json,sys
  6 
  7 basepath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  8 setting_path = os.path.join(os.path.join(basepath, 'conf'), 'setting')
  9 
 10 
 11 def get_setting(setting_path):
 12     cf = configparser.ConfigParser()
 13     cf.read(setting_path)
 14     setting_dict = {}
 15     for section in cf.sections():
 16         setting_dict[section] = {}
 17         for key,value in cf.items(section):
 18             setting_dict[section][key] = value
 19 
 20     return setting_dict
 21 
 22 
 23 setting_dict = get_setting(setting_path)
 24 server_address = eval(setting_dict['client_info']['server_address'])
 25 download_dir = os.path.join(basepath, setting_dict['client_info']['download_dir'])
 26 
 27 
 28 class FTPClient(object):
 29 
 30     def __init__(self,server_adress):
 31         self.socket = socket.socket()
 32         self.socket.connect(server_adress)
 33 
 34 
 35     def rename(self,file_path,times=1):
 36         f_name,f_extension = os.path.splitext(file_path)
 37         f_name += '(%s)'%str(times)
 38         new_file_path = f_name + f_extension
 39 
 40         if os.path.isfile(new_file_path):
 41             times += 1
 42             return self.rename(file_path,times)
 43         else:
 44             return new_file_path
 45 
 46 
 47     def get(self,file_path):
 48         self.socket.send(b'0')  # 配合服务端完成阻塞,以防止服务端粘包(s:1)
 49         # print('preare to get file_size')
 50         file_size = int(self.socket.recv(1024).decode('utf-8'))
 51         # print('get file_size ',file_size)
 52         self.socket.send(b'0')  # 配合服务端完成阻塞,以防止服务端粘包(s:2)
 53 
 54         file_path = os.path.join(download_dir,file_path)
 55         # print('prepare to handle file_path')
 56         real_file_path = self.rename(file_path) if os.path.isfile(file_path) else file_path
 57         # print('finished handling file_path')
 58         accepted_size = 0
 59         last_accepted_size = 0
 60         print('Downloading %s: '%os.path.basename(file_path),end='')
 61         with open(real_file_path,'wb') as f:
 62             while accepted_size < file_size:
 63                 if accepted_size >= file_size - 1024:
 64                     buffersize = file_size - accepted_size
 65                 else:
 66                     buffersize = 1024
 67 
 68                 accepting_data = self.socket.recv(buffersize)
 69                 f.write(accepting_data)
 70                 accepted_size += len(accepting_data)
 71 
 72                 # 打印进度条
 73                 bar_num = (accepted_size - last_accepted_size) * 100 // file_size
 74 
 75                 if bar_num:
 76                     sys.stdout.write('#'*bar_num)
 77                     sys.stdout.flush()
 78                     last_accepted_size = accepted_size  # 只有当前后2次的差达到文件大小的1%时,才为代表上一次传输大小的变量(last_accepted_size)赋值
 79 
 80             print(' done')
 81 
 82     def put(self,file_path):
 83         self.socket.recv(1024)  # 阻塞,以防止客户端粘包(c:1)
 84 
 85         file_size = os.stat(file_path).st_size
 86         self.socket.send(str(file_size).encode())
 87 
 88         self.socket.recv(1024)    # 阻塞,以防止客户端粘包(c:2)
 89 
 90         send_size = 0
 91         last_send_size = 0
 92         print('Uploading %s: '%os.path.basename(file_path),end = '')
 93         with open(file_path,'br') as f:
 94             for line in f:
 95                 self.socket.send(line)
 96                 send_size += len(line)
 97 
 98                 bar_num = (send_size - last_send_size)*100 // file_size
 99                 if bar_num:
100                     sys.stdout.write('#'*bar_num)
101                     sys.stdout.flush()
102                     last_send_size = send_size
103 
104         print(' done')
105 
106 
107     def interactive(self):
108         while 1:
109             # print('prepare to get file_list')
110             file_list = json.loads(self.socket.recv(1024).decode())
111             print('
服务器存储空间的文件列表:
%s
'%('
'.join([file for file in file_list]) if file_list
112                                            else '33[1;32m目前尚未有文件33[0m'))
113             # print('has got file_list')
114             cmd = input('请输入指令:').strip()
115 
116             if cmd == '':
117                 print('输入不能为空')
118                 continue
119 
120             if len(cmd.split()) == 2:
121                 action = cmd.split()[0]
122                 file_path = cmd.split()[1]
123 
124                 if action == 'put':
125                     if os.path.isfile(file_path):
126                         self.socket.send(cmd.encode())
127                         self.put(file_path)
128                     else:
129                         self.socket.send(b'skip')
130                         print('33[1;31m您要上传的文件不存在33[0m')
131 
132                 elif action == 'get':
133                     self.socket.send(cmd.encode())
134                     exist_flag = self.socket.recv(1024).decode('utf-8')
135                     # print('exist_flag: ',exist_flag)
136                     if exist_flag == '0':
137                         self.get(file_path)
138                     else:
139                         self.socket.send(b'skip')
140                         print('33[1;31m您要下载的文件不存在33[0m')
141                 else:
142                     self.socket.send(b'skip')
143                     print('33[1;31m错误指令,请重试!33[0m')
144 
145             else:
146                 if cmd == 'exit':
147                     self.socket.send(cmd.encode())
148                     self.socket.close()
149                     return
150                 else:
151                     self.socket.send(b'skip')
152                     print('33[1;31m错误指令,请重试!33[0m')
153 
154 
155 def main():
156     fc = FTPClient(server_address)
157     fc.interactive()
158 
159 if __name__ == '__main__':
160 
161     main()
core/main.py
原文地址:https://www.cnblogs.com/jailly/p/7266565.html