4月21号
服务端
import json, struct, os
from socket import *
class MyTcpServer:
def __init__(self, ipport):
self.ipport = ipport
self.server = socket(AF_INET,SOCK_STREAM)
try:
self.server_bind()
self.server_activate()
except Exception:
self.server_close()
def server_bind(self):
self.server.bind(self.ipport)
def server_activate(self):
self.server.listen(5)
def server_head(self, filesize, status):
head_dic = {'filesize': filesize, 'status': status}
head_json = json.dumps(head_dic)
head_json_bytes = head_json.encode('utf-8')
head_struct = struct.pack('i', len(head_json_bytes))
self.conn.send(head_struct)
self.conn.send(head_json_bytes)
def get_request(self):
print('等待连接...')
return self.server.accept()
def server_close(self):
self.server.close()
def close_request(self, request):
request.close()
def run(self):
while True:
self.conn, self.client_addr = self.get_request()
print('from client ', self.client_addr)
while True:
try:
head_struct = self.conn.recv(4)
if not head_struct: break
head_len = struct.unpack('i', head_struct)[0]
head_json = self.conn.recv(head_len).decode('utf-8')
head_dic = json.loads(head_json)
print(head_dic)
cmd = head_dic['cmd']
if hasattr(self, cmd):
func = getattr(self, cmd)
func(head_dic)
except Exception:
break
def get(self, args):
file_path = os.path.normpath(os.path.join('test_tcp', args['filename']))
if os.path.exists(file_path):
filesize = os.path.getsize(file_path)
self.server_head(filesize, True)
with open(file_path, 'rb') as f:
for line in f:
self.conn.send(line)
else:
self.server_head(None, False)
def put(self, args):
file_path = os.path.normpath(os.path.join('test_tcp', args['filename']))
filesize = args['filesize']
recv_size = 0
with open(file_path, 'wb') as f:
while recv_size < filesize:
recv_data = self.conn.recv(4096)
f.write(recv_data)
recv_size += len(recv_data)
server = MyTcpServer(('127.0.0.1',8080))
server.run()
##并发
import json, struct, os
import socketserver
class MyRequestHandle(socketserver.BaseRequestHandler):
def handle(self):
print(self.client_address)
while True:
try:
head_struct = self.request.recv(4)
if not head_struct: break
head_len = struct.unpack('i', head_struct)[0]
head_json = self.request.recv(head_len).decode('utf-8')
head_dic = json.loads(head_json)
print(head_dic)
cmd = head_dic['cmd']
if hasattr(self, cmd):
func = getattr(self, cmd)
func(head_dic)
except Exception:
break
self.request.close()
def server_head(self, filesize, status):
head_dic = {'filesize': filesize, 'status': status}
head_json = json.dumps(head_dic)
head_json_bytes = head_json.encode('utf-8')
head_struct = struct.pack('i', len(head_json_bytes))
self.request.send(head_struct)
self.request.send(head_json_bytes)
def get(self, args):
file_path = os.path.normpath(os.path.join('test_tcp', args['filename']))
if os.path.exists(file_path):
filesize = os.path.getsize(file_path)
self.server_head(filesize, True)
with open(file_path, 'rb') as f:
for line in f:
self.request.send(line)
else:
self.server_head(None, False)
def put(self, args):
file_path = os.path.normpath(os.path.join('test_tcp', args['filename']))
filesize = args['filesize']
recv_size = 0
with open(file_path, 'wb') as f:
while recv_size < filesize:
recv_data = self.request.recv(4096)
f.write(recv_data)
recv_size += len(recv_data)
s=socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyRequestHandle)
print("等待连接...")
s.serve_forever()
客户端
import json, struct, os
from socket import *
class MyTcpClient:
def __init__(self, ipport):
self.ipport = ipport
self.client = socket(AF_INET, SOCK_STREAM)
try:
self.client_connect()
except Exception:
self.client_close()
def client_connect(self):
self.client.connect(self.ipport)
def client_close(self):
self.client.close()
def download(self, total_size, recv_size):
percent = recv_size / total_size
res = int(50 * percent) * '#'
print('
[%-50s] %d%%' % (res, int(100 * percent)), end='')
def run(self):
while True:
cmd = input("输入指令>>: ").strip()
if not cmd: continue
li = cmd.split()
cmd = li[0]
if hasattr(self, cmd):
func = getattr(self, cmd)
func(li)
def get(self, args):
cmd = args[0]
filename = args[1]
head_dic = {'cmd': cmd, 'filename': filename}
print(head_dic)
head_json = json.dumps(head_dic)
head_json_bytes = head_json.encode('utf-8')
head_struct = struct.pack('i', len(head_json_bytes))
self.client.send(head_struct)
self.client.send(head_json_bytes)
rc_headstruct = self.client.recv(4)
rc_len = struct.unpack('i', rc_headstruct)[0]
rc_json = self.client.recv(rc_len).decode('utf-8')
rc_dic = json.loads(rc_json)
if rc_dic['status']:
total_size = rc_dic['filesize']
recv_size = 0
with open(filename, 'ab') as f:
while recv_size < total_size:
recv_data = self.client.recv(1024)
recv_size += len(recv_data)
self.download(total_size, recv_size)
f.write(recv_data)
print('
文件下载成功')
else:
print("服务器没有此文件")
def put(self, args):
cmd = args[0]
filename = args[1]
if not os.path.isfile(filename):
print('file:%s is not exists' % filename)
return
else:
filesize = os.path.getsize(filename)
head_dic = {'cmd': cmd, 'filename': os.path.basename(filename), 'filesize': filesize}
print(head_dic)
head_json = json.dumps(head_dic)
head_json_bytes = bytes(head_json, encoding='utf-8')
head_struct = struct.pack('i', len(head_json_bytes))
self.client.send(head_struct)
self.client.send(head_json_bytes)
send_size = 0
with open(filename, 'rb') as f:
for line in f:
self.client.send(line)
send_size += len(line)
self.download(filesize, send_size)
else:
print('
upload successful')
client = MyTcpClient(('127.0.0.1',8080))
client.run()
4月20号
服务端
import subprocess
import struct
from socket import *
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)
while True:
conn,client_addr=server.accept()
while True:
try:
cmd=conn.recv(1024)
if len(cmd) == 0:break
obj=subprocess.Popen(cmd.decode('utf-8'),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout_res=obj.stdout.read()
stderr_res=obj.stderr.read()
total_size=len(stdout_res)+len(stderr_res)
header=struct.pack('i',total_size)
conn.send(header)
conn.send(stdout_res)
conn.send(stderr_res)
except Exception:
break
conn.close()
客户端
import struct
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8083))
while True:
cmd=input('请输入命令>>:').strip()
if len(cmd) == 0:continue
client.send(cmd.encode('utf-8'))
header=client.recv(4)
total_size=struct.unpack('i',header)[0]
recv_size = 0
while recv_size < total_size:
recv_data=client.recv(1024)
recv_size+=len(recv_data)
print(recv_data.decode('gbk'),end='')
else:
print()
4月15号
# 1、在元类中控制把自定义类的数据属性都变成大写
class Mymeta(type):
def __new__(cls, name, bases, attrs):
update_attrs = {}
for k, v in attrs.items():
if not callable(v) and not k.startswith('__'):
update_attrs[k.upper()] = v
else:
update_attrs[k] = v
return super().__new__(cls, name, bases, update_attrs)
class Foo(metaclass=Mymeta):
name = 'fyy'
def walk(self):
print(f'{self.name} walk')
print(Foo.__dict__)
# 2、在元类中控制自定义的类无需__init__方法
class Mymeta(type):
def __call__(self, *args, **kwargs):
if args:
raise TypeError('must use keyword argument for key function')
obj = object.__new__(self)
for k, v in kwargs.items():
obj.__dict__[k.upper()] = v
return obj
class Foo(metaclass=Mymeta):
name = 'fyy'
def walk(self):
print(f'{self.name} walk')
p=Foo(name='lili',age=18,sex='male')
print(p.__dict__)
# 3、在元类中控制自定义的类产生的对象相关的属性全部为隐藏属性
class Mymeta(type):
# def __init__(self, class_name, class_bases, class_dic):
# # 控制类Foo的创建
# super().__init__(class_name, class_bases, class_dic)
def __call__(self, *args, **kwargs):
obj = self.__new__(self)
self.__init__(obj, *args, **kwargs)
obj.__dict__ = {f'_{self.__name__}__{k}':v for k,v in obj.__dict__.items()}
return obj
class Foo(object, metaclass=Mymeta):
def __init__(self, name, age):
self.name = name
self.age = age
obj = Foo('fyy', '19')
print(obj.__dict__)
# 4、基于元类实现单例模式
class Mymeta(type):
def __init__(self,name,bases,dic):
self.__instance = object.__new__(self)
self.__init__(self.__instance, '1.1.1.4',3307)
super().__init__(name,bases,dic)
def __call__(self, *args, **kwargs):
if args or kwargs:
obj=object.__new__(self)
self.__init__(obj,*args,**kwargs)
return obj
return self.__instance
class Mysql(metaclass=Mymeta):
def __init__(self,host,port):
self.host=host
self.port=port
obj1=Mysql()
obj2=Mysql()
print(obj1 is obj2 is obj2)
obj4=Mysql('1.1.1.4',3307)
4月7号
https://gitee.com/FenYiYuan/python-scs.git
4月3号
https://gitee.com/FenYiYuan/python.git
3月31号
# 作业:
# 1、把登录与注册的密码都换成密文形式
# 3、注册功能改用json实现
import hashlib, json
def login():
name = input("用户名:")
pwd = input("密码:")
pwd = hashlib.md5(pwd.encode('utf-8')).hexdigest()[1:11]
sh = hashlib.sha1((pwd + 'fyy').encode('utf-8')).hexdigest()[1:11]
with open('user.txt', 'rt', encoding='utf-8') as f:
for line in f:
usermsg = json.loads(line)
if usermsg.get("name") == name and sh == usermsg.get("password"):
print("登录成功")
return
else:
print('用户或密码错误')
return
def rigist():
name = input("用户名:")
pwd = input("密码:")
pwd = hashlib.md5(pwd.encode('utf-8')).hexdigest()[1:11]
sh = hashlib.sha1((pwd+'fyy').encode('utf-8')).hexdigest()[1:11]
user = {"name":name, 'password':sh}
with open('user.txt', 'at', encoding='utf-8') as f:
json.dump(user, f)
f.write('
')
login()
# 2、文件完整性校验(考虑大文件)
import os, hashlib
def foo(filepath):
size = os.path.getsize(filepath)
tmp = size // 6
sum = b''
with open(filepath, 'rb') as f:
for i in range(6):
f.seek(0*tmp)
data = f.read(tmp//6)
sum = sum + data
return hashlib.md5(sum).hexdigest()
print(foo(r'D:pycharmoldboy20200330a.txt'))
# 4、项目的配置文件采用configparser进行解析
'''
config.ini
[section1]
username=egon
is_admin=True
pwd=31
[section2]
hobby=playgame
salary=10000
age=18
sex='男'
'''
import configparser
config=configparser.ConfigParser()
config.read('conf.ini', encoding='utf-8')
# 1、获取sections
print(config.sections())
# 2、获取某一section下的所有options
print(config.options('section1'))
# 3、获取items
print(config.items('section1'))
# 4、
username=config.get('section1','username')
is_admin=config.getboolean('section1','is_admin')
salary=config.getfloat('section2','salary')
3月30号
今日作业:
##1、检索文件夹大小的程序,要求执行方式如下
python3.8 run.py 文件夹
import sys, os
spath = sys.argv[1]
sum = 0
def sumfilesize(dirpath):
global sum
path = dirpath
dirlist = os.listdir(path)
for file in dirlist:
res = os.path.join(path, file)
if os.path.isfile(res):
sum += os.path.getsize(res)
else:
sumfilesize(res)
return sum
print(sumfilesize(spath))
##2、明天上午日考:随机验证码、模拟下载以及打印进度条、文件copy脚本
import random
def vrft_code(size=4):
res = ''
for i in range(size):
s = chr(random.randint(65,90))
s1 = chr(random.randint(97,122))
n = str(random.randint(0,9))
res += random.choice((s,n,s1))
return res
print(vrft_code())
import time
def download(total_size, recv_size=0):
def progress(percent):
if percent > 1:
percent = 1
res = int(50 * percent) * '#'
print('
[%-50s] %d%%' % (res, int(100 * percent)), end='')
while recv_size < total_size:
time.sleep(0.1) # 下载了1024个字节的数据
recv_size += 1000
percent = recv_size / total_size
progress(percent)
download(20000)
import sys
src_file=sys.argv[1]
dst_file=sys.argv[2]
with open(r'%s' %src_file,mode='rb') as read_f,
open(r'%s' %dst_file,mode='wb') as write_f:
for line in read_f:
write_f.write(line)
3月26号
# 作业:
# 1、文件内容如下,标题为:姓名,性别,年纪,薪资
# egon male 18 3000
# alex male 38 30000
# wupeiqi female 28 20000
# yuanhao female 28 10000
#
# 要求:
# 从文件中取出每一条记录放入列表中,
# 列表的每个元素都是{'name':'egon','sex':'male','age':18,'salary':3000}的形式
with open('a.txt','rt', encoding='utf-8') as f:
info=[{
'name': line.split()[0],
'sex': line.split()[1],
'age': int(line.split()[2]),
'salary':int(line.split()[3])
} for line in f]
print(info)
# 2 根据1得到的列表,取出薪资最高的人的信息
print(max(info, key=lambda dic:dic.get('salary')))
# 3 根据1得到的列表,取出最年轻的人的信息
print(min(info, key=lambda dic:dic.get('age')))
# 4、将names=['egon','alex_sb','wupeiqi','yuanhao']中的名字全部变大写
names=['egon','alex_sb','wupeiqi','yuanhao']
print([name.upper() for name in names])
print(list(map(lambda x:x.upper(), names)))
# 5、将names=['egon','alex_sb','wupeiqi','yuanhao']中以sb结尾的名字过滤掉,然后保存剩下的名字长度
print(list(filter(lambda name:not name.endswith('sb'), names)))
print([name for name in names if not name.endswith('sb')])
# 6、求文件a.txt中最长的行的长度(长度按字符个数算,需要使用max函数)
with open('a.txt', 'rt', encoding='utf-8') as f:
# print(len(max(f, key=lambda x:len(x))))
tmp = [len(line) for line in f]
print(max(tmp))
# 7、求文件a.txt中总共包含的字符个数?思考为何在第一次之后的n次sum求和得到的结果为0?(需要使用sum函数)
print(sum(tmp))
# 8、思考题
with open('a.txt') as f:
g=(len(line) for line in f)
print(sum(g)) #为何报错?
g 是生成器
# 9、文件shopping.txt内容如下
# mac,20000,3
# lenovo,3000,10
# tesla,1000000,10
# chicken,200,1
# 求总共花了多少钱?
with open('shopping.txt','rt', encoding='utf-8') as f:
info=[line.split(',') for line in f]
cost=sum(int(price)*int(count) for _, price, count in info)
print(cost)
# 打印出所有商品的信息,格式为[{'name':'xxx','price':333,'count':3},...]
with open('shopping.txt','rt', encoding='utf-8') as f:
info=[{
'name': line[0],
'price': int(line[1]),
'count': int(line[2]),
} for line in [i.split(',') for i in f]]
print(info)
# 求单价大于10000的商品信息,格式同上
with open('shopping.txt','rt', encoding='utf-8') as f:
info=[{
'name': j[0],
'price': int(j[1]),
'count': int(j[2]),
} for j in [i.split(',') for i in f] if int(j[1]) > 10000]
print(info)
# 10、思考:判断下述说法是否正确
# 题目1:
# 1、应该将程序所有功能都扔到一个模块中,然后通过导入模块的方式引用它们
错误
# 2、应该只将程序各部分组件共享的那一部分功能扔到一个模块中,然后通过导入模块的方式引用它们
正确
#
# 题目2:
# 运行python文件与导入python文件的区别是什么?
一种被当程序执行,另一种被当模块导入,
# 运行的python文件产生的名称空间何时回收,为什么?
文件程序执行完回收名称空间
# 导入的python文件产生的名称空间何时回收,为什么?
当没有文件引用时,回收名称空间
3月25号
# 1、文件内容如下,标题为:姓名,性别,年纪,薪资
# 要求:
# 从文件中取出每一条记录放入列表中,
# 列表的每个元素都是{'name':'egon','sex':'male','age':18,'salary':3000}的形式
a.txt
# egon male 18 3000
# alex male 38 30000
# wupeiqi female 28 20000
# yuanhao female 28 10000
with open('a.txt', 'rt', encoding='utf-8') as f:
info = [
{
'name': line.split()[0], 'sex': line.split()[1], 'age': line.split()[2], 'salary': line.split()[3]
} for line in f
]
print(info)
# 2 根据1得到的列表,取出所有人的薪资之和
data = (dic.get('salary') for dic in info)
print(sum(int(i) for i in data))
print(sum(int(i) for i in [dic.get('salary') for dic in info]))
# 3 根据1得到的列表,取出所有的男人的名字
print([k.get('name') for k in info for v in k if k.get(v) == 'male'])
# 4 根据1得到的列表,将每个人的信息中的名字映射成首字母大写的形式
info_new=map(lambda item:{'name':item['name'].capitalize(),
'sex':item['sex'],
'age':item['age'],
'salary':item['salary']},info)
print(list(info_new))
for dic in info:
dic['name'] = dic.get('name').capitalize()
print(info)
# 5 根据1得到的列表,过滤掉名字以a开头的人的信息
print([dic for dic in info if not dic.get('name').startswith('a')])
# 6 使用递归打印斐波那契数列(前两个数的和得到第三个数,如:0 1 1 2 3 5 8...)
def foo(n):
if n == 1:
return 0
if n == 2:
return 1
return foo(n-1) + foo(n - 2)
def foo(n):
x, y = 0, 1
for i in range(1, n+1):
if i == 1:
print(x)
elif i == 2:
print(y)
else:
x, y = y, x+y
print(y)
print(foo(6))
def fib(a,b,stop):
if a > stop:
return
print(a,end=' ')
fib(b,a+b,stop)
# 7 一个嵌套很多层的列表,如l=[1,2,[3,[4,5,6,[7,8,[9,10,[11,12,13,[14,15]]]]]]],用递归取出所有的值
def foo(obj):
for i in obj:
if type(i) is list:
foo(i)
else:
print(i)
l = [1,2,[3,[4,5,6,[7,8,[9,10,[11,12,13,[14,15]]]]]]]
foo(l)