youku系统

youku系统

需求分析

仿优酷项目:
	功能介绍:
		1.基于tcp协议写一个类视频客户端
		2.普通用户可以登录,注册(密码通过md5加密),登录成功后显示最新一条公告
		3.普通用户可以充值
		3.1 可以开通会员
		4.普通用户可以查看所有视频(打印所有视频名称)
		5.普通用户可以下载普通视频(非会员下载普通视频需要30s广告,会员下载无需等待)
		6.普通用户可以下载收费视频(非会员下载收费10元,会员下载收费5元)
		7.普通用户可以查看观影记录(电影下载记录)
		8.普通用户可以查看公告(历史公告)
		9.管理员可以登录,注册
		10.管理员可以上传普通视频,上传收费视频
		11.管理员可以删除视频(并不是真正删除,只是用户看不到了)
		12.管理员可以发布公告
		13.管理员扩展功能:如查看总用户量,锁定,解锁用户

		
	功能分析:
		管理员功能:1、登录 2、注册 3、上传视频 4、删除视频 5、发布公告, 6.用户管理
		用户功能:1、登录(成功后立即展示最新的公告) 2、注册 3、查看所有视频
		4、下载视频 5、查看公告 6、查看观影记录,7开通会员,8充值

	用户表
		id  用户名  密码  会员状态  余额   用户类型  是否被锁定 

	下载记录表      与用户表做关联  
		
		电影名称  用户id   下载时间 

 
	公告表       与用户表管理
		id  标题  内容  发布时间 管理员id

	电影表     与用户表关联
		id  名字  路径 上传时间  上传人   是否收费   是否删除 
		
	分析需求
		1.是否可实现 
		2.该需求设计到那张表
		3.该需求设计到哪个字段 
		4.分析表之间关系  

客户端 client

BIN目录

import os,sys
sys.path.append(os.path.dirname(os.path.dirname(__file__)))

from YKClient.core import src

if __name__ == '__main__':
    src.run()

conf目录

#监听端口
PORT = 9999
# 服务器IP
IP = "127.0.0.1"

core

############
# admin.py #
############

"""
3、上传视频 4、删除视频 5、发布公告, 6.用户管理
"""
import os

from YKClient.lib.file_tool import get_md5
from YKClient.core.client_socket import client

current_id = None


def upload_movie():
    path = input("请输入文件路径:
").strip()
    if os.path.exists(path) and os.path.isfile(path):
        file_info = {"func": "upload",
                     "filename": os.path.basename(path),
                     "filesize": os.path.getsize(path),
                     "md5": get_md5(path),
                     "uid": current_id}

        free = input("是否收费(y:收费)").strip()
        if free == "y":
            file_info["is_free"] = "1"
        else:
            file_info["is_free"] = "0"

        resp = client.send_file(path, file_info)
        print(resp)
    else:
        print("路径错误,必须存在的文件!")


def delete_movie():
    """
    1.获取电影的id和名字列表
    2.选择一个要删除的
    3.将id传给服务器
    4.接收响应结果
    """
    req = {"func": "get_movie_list"}
    client.send_request(req)
    resp = client.recv_response()  # {"code":200,"movies":[{"id":"123","name":"十面埋伏"}]}
    if not resp["movies"]:
        print("没有电影....")
        return

    for m_dic in resp["movies"]:
        print(m_dic)

    res = input("请输入要删除电影id:
").strip()
    if res.isdigit():
        res = int(res)

    if res in [d["id"] for d in resp["movies"]]:
        req2 = {"func": "delete", "id": res}
        client.send_request(req2)
        resp2 = client.recv_response()
        print(resp2)
    else:
        print("id错误!")


def publish_notice():
    """
    1.输入标题
    2.输入内容
    3.发送给服务器 必须包含func uid
    4.接收响应
    """
    title = input("请输入标题:").strip()
    content = input("请输入内容:").strip()
    if title and content:
        req = {"func": "publish_notice", "title": title, "content": content, "uid": current_id}
        client.send_request(req)
        response = client.recv_response()
        print(response)
    else:
        print("标题或内容不能为空!")


def user_manage():
    """
    1.获取所有的用户信息 展示
    2.选择需要处理的用户id
    3.询问解锁还是锁定
    4.发送请求 包含 func 用户id  锁定状态
    5.接收响应
    """
    req = {"func": "get_users"}
    client.send_request(req)
    resp = client.recv_response()  # {"users":[{"id":"123","name":"张三","is_lock":0}]}
    print("当前用户数量%s" % len(resp["users"]))
    for u in resp["users"]:
        print(u)

    id = input("请输入要操作的id:
").strip()
    if id.isdigit():
        id = int(id)

    if id in [u["id"] for u in resp["users"]]:
        lock = input("解锁输入:0 锁定输入:1").strip()
        if lock == "0" or lock == "1":
            req2 = {"func": "lock", "id": id, "is_lock": int(lock)}
            client.send_request(req2)
            resp2 = client.recv_response()
            print(resp2)
        else:
            print("输入错误")


funcs = {"1": upload_movie, "2": delete_movie, "3": publish_notice, "4": user_manage}


def views(id):
    global current_id
    current_id = id
    print("尊敬的管理员,欢迎登陆...")
    while True:
        res = input("请选择功能:
1.上传视频
2.删除视频
3.发布公告
4.用户管理
q.返回
").strip()
        if res == "q":
            break
        if res in funcs:
            funcs[res]()
        else:
            print("输入错误!")
    print("返回上一级.....")

####################
# client_socket.py #
####################

import json
import socket
import struct

from conf.settings import *
class Client:

    def __init__(self):
        soc = socket.socket()
        soc.connect((IP,PORT))
        self.__soc = soc
        print("连接服务器成功!")

    def recv_response(self):
        """接收响应数据"""
        len_bytes = self.__soc.recv(4)
        head_len = struct.unpack("i", len_bytes)[0]

        data_bytes = self.__soc.recv(head_len)
        json_data = json.loads(data_bytes.decode("utf-8"))

        return json_data

    def send_request(self,req):
        "发送请求"
        json_bytes = json.dumps(req).encode("utf-8")
        len_bytes = struct.pack("i", len(json_bytes))

        self.__soc.send(len_bytes)
        self.__soc.send(json_bytes)

    def send_file(self,path,info):
        self.send_request(info)
        # 发送文件数据
        f = open(path, "rb")
        while True:
            data = f.read(1024)
            if not data:
                break
            self.__soc.send(data)
        return self.recv_response()

# 创建客户端连接对象
client = Client()
##########
# src.py #
##########


from core.user import register
from .client_socket import *
from lib import file_tool
from core import admin,user


def login():
    name = input("用户名:").strip()
    pwd = input("密码:").strip()
    req = {"func":"login","name":name,"pwd":file_tool.get_md5_password(pwd)}

    if name and pwd:
        client.send_request(req)
        resp = client.recv_response()

        if resp["status"]:
            print("登录成功!")
            if resp["usertype"] == 0:
                user.views(resp["id"])
            else:
                admin.views(resp["id"])
        else:
            print(resp["msg"])

    else:
        print("用户名或密码不能为空。。。")

funcs = {"1":login,"2":register}
def run():
    while True:
        res= input("请选择功能:
1.统一登录
2.用户注册
q.退出
").strip()
        if res == "q":
            break
        if res in funcs:
            funcs[res]()
        else:
            print("输入错误!")
    print("see you la la...")



############
# user.py #
###########

from lib import file_tool
from .client_socket import *

current_id = None

def register():
    name = input("用户名:").strip()
    pwd = input("密码:").strip()

    req = {"func": "register", "name": name, "pwd": file_tool.get_md5_password(pwd)}

    if name and pwd:
        client.send_request(req)
        resp = client.recv_response()
        print(resp)
    else:
        print("用户名或密码不能为空。。。")

"""
1、登录(成功后立即展示最新的公告) 2、注册 3、查看所有视频
		4、下载视频 5、查看公告 6、查看观影记录,7开通会员,8充值
"""

def show_movies():
    req = {"func":"get_movie_list"}
    client.send_request(req)
    resp = client.recv_response()
    movies = resp["movies"]
    if not movies:
        print("还没有电影,管理员小哥哥正在加班上传...")
        return

    for i in movies:
        print("%-6s`%s`" % (i["id"], i["name"]))

    return movies

def download_movie():
    print("电影列表如下:")
    ms = show_movies()
    if not ms:
        return
    id = input("请输入电影的ID:").strip()
    if id in [str(m["id"]) for m in ms]:
        req = {"func":"download","id":id,"uid":current_id}
        client.send_request(req)
        res = client.recv_file()
        # 下载成功了! 要上传下载记录
        if res:
            req = {"func":"up_history","uid":current_id,"mid":id}
            client.send_request(req)
            client.recv_response() # 请求响应模型  一次请求必须对应一次响应 否则会粘包问题

    else:
        print("id 不存在!")

def show_notices():
    req = {"func":"get_notices"}
    client.send_request(req)
    resp = client.recv_response()# {"notices":[{time,content},{}}
    if not resp["notices"]:
        print("没有公告!")
        return
    print("========notices=========")
    for i in resp["notices"]:
        print("title:%s
content:%s" % (i["title"],i["content"]))
    print("==========end===========")


def download_history():
    req = {"func":"download_history","uid":current_id}
    client.send_request(req)
    resp = client.recv_response() #{"histories":[{downtime,mname},{}]}
    if not resp["histories"]:
        print("没有记录!")
        return
    print("========下载历史=======")
    for i in resp["histories"]:
        print("%s : %s" % (i["downtime"],i["mname"]))
    print("==========end=========")


def open_vip():
    """
    假设会员费用为30,
    传递用户的id给服务器
    服务器判断这个用户当前是否已经是会员
    如果不是,则判断余额是否足够
    如果如果足够则扣钱 并将vip改为1
    如果不足则返回提示信息
    """
    req = {"func":"openvip","uid":current_id}
    client.send_request(req)
    resp = client.recv_response()
    print(resp["msg"])


def recharge():
    """
    输入充值金额
    将金额与uid传给服务器
    服务器修改用户余额字段
    并返回后充值后的余额信息
    """
    money = input("请输入充值金额:").strip()
    try:
        float(money)
        money = float(money)
    except:
        print("输入不正确,必须是数字!")
        return

    req = {"func":"recharge","money":money,"uid":current_id}
    client.send_request(req)
    resp = client.recv_response()
    print(resp["msg"])






funcs = {"1":show_movies,"2":download_movie,"3":show_notices,"4":download_history,"5":open_vip,"6":recharge}

def views(id):
    global current_id
    current_id = id
    print("欢迎进入优酷系统....")
    while True:
        res= input("请选择功能:
1.查看视频
2.下载视频
3.查看公告
4.下载历史
5.开通会员
6.充值
q.返回
").strip()
        if res == "q":
            break
        if res in funcs:
            funcs[res]()
        else:
            print("输入错误!")
    print("返回上一级...")


db目录

lib目录

################
# file_tool.py #
################

import hashlib

salt = "com.oldboy.www"


def get_md5(path):
    md5 = hashlib.md5()
    f = open(path,"rb")
    while True:
        data = f.read(1024)
        if not data:
            break
        md5.update(data)
    f.close()
    return md5.hexdigest()

def get_md5_password(password):
    md5 = hashlib.md5()
    md5.update(password.encode("UTF-8"))
    md5.update(salt.encode("GBK"))
    pwd = md5.hexdigest()
    return pwd


服务端 server

bin 目录

############
# start.py #
############

import os,sys
sys.path.append(os.path.dirname(os.path.dirname(__file__)))

from YKClient.core import src


if __name__ == '__main__':
    src.run()

conf 目录

###############
# settings.py #
###############

import os
BASE_PATH = os.path.dirname(os.path.dirname(__file__))

VIDEOS_PATH = os.path.join(BASE_PATH,"videos")

db 目录

#############
# models.py #
#############

from db.ORM.base_model import *
class User(Model):
    """
    id  用户名  密码  会员状态  余额   用户类型  是否被锁定
    """
    id = IntegerField(is_primary=True, increment=True)
    name = StringField(length=20)
    pwd = StringField(length=32)
    vip = IntegerField()
    extra = FloatField()
    utype = IntegerField()
    islock = IntegerField()

class History(Model):
    """
    电影名称  用户id   下载时间
    """
    id = IntegerField(is_primary=True, increment=True)
    mname = StringField(length=50)
    downtime = DatetimeField()
    u_id = IntegerField(foreign=(User.__name__, "id"))



class Notice(Model):
    """id  标题  内容  发布时间 管理员id"""
    id = IntegerField(increment=True,is_primary=True)
    title = StringField(length=50)
    content = StringField(length=500)
    pub_time = DatetimeField()
    u_id = IntegerField(foreign=(User.__name__,"id"))


class Movie(Model):
    """id  名字  路径 上传时间  上传人   是否收费   是否删除 """
    id = IntegerField(increment=True, is_primary=True)
    name = StringField(length=50)
    path = StringField(length=1000)
    up_time = DatetimeField()
    u_id = IntegerField(foreign=(User.__name__, "id"))
    is_free = IntegerField(default=0) # 0表示免费 1表示收费
    is_delete = IntegerField(default=0) # 0表示未删除 1表示被删除

orm 框架 (放于db中)

#################
# base_model.py #
#################


from .ZS_POOL import ConnectPool
# 忽略警告
import warnings
warnings.filterwarnings("ignore")

class IntegerField:
    text = "int"

    # foreign属性的值类型 : (类,字段名称)
    def __init__(self, foreign=None, default=0, is_primary=False, increment=False):
        self.default = default
        self.is_primary = is_primary
        self.increment = increment
        self.foreign = foreign


class StringField:
    text = "varchar"
    def __init__(self, default=None, length=1):
        self.default = default
        self.length = length

class FloatField:
    text = "float"

    def __init__(self, default=0):
        self.default = default


class DatetimeField:
    # text = "datetime"
    def __init__(self, auto_update=False):
        self.auto_update = auto_update
        if auto_update:
            self.text = "timestamp"
        else:
            self.text = "datetime"


# 创建连接池
pool = ConnectPool()


class TableMeta(type):
    def __init__(self, classname, bases, dicts):
        super().__init__(classname, bases, dicts)
        if self.__name__ == "Model": return # 如果是基类则跳过

        dicts = {k: v for k, v in dicts.items() if not k.startswith("__")}  # 去除默认的属性只保留字段信息
        sql = "create table if not exists %s(" % classname  # 拼接表名
        foreigns = {}
        # 遍历所有字段
        for k, v in dicts.items():
            sql += k  # 字段名称
            sql += (" " + v.text)  # 字段类型
            # 整型的处理
            if isinstance(v, IntegerField):
                if v.is_primary:
                    sql += " primary key"
                    v.default = None
                if v.increment:
                    sql += " auto_increment"
                if v.foreign:  # 记录所有外键
                    foreigns[k] = v
            # 字符类型的处理
            elif isinstance(v, StringField):
                sql += "(%s)" % v.length
            # 默认值处理
            if not isinstance(v, DatetimeField):
                if v.default != None:
                    sql += " default '%s'" % v.default
            sql += ","

        # 如果存在外键关联
        for k, v in foreigns.items():
            sql += " foreign key(%s) references %s(%s)," % (k, v.foreign[0], v.foreign[1])

        # 去除最后的逗号
        sql = sql[:-1]
        sql += ")"

        pool.execute_sql(sql)

class Model(metaclass=TableMeta):
    def save(self):
        l = len(self.__dict__)
        l = l * "%s,"
        l = l[:-1]  # 得到对应的百分号

        ks = str(list(self.__dict__.keys())).strip("][") # 得到对应字段们
        ks = ks.replace("'","")
        vs = list(self.__dict__.values()) # 得到对应的值们

        sql = "insert into %s(%s) values(%s)" % (self.__class__.__name__,ks,l)
        # 需要的sql格式:
        # insert into History(id, downtime, mname) values(%s,%s,%s)
        pool.execute_sql(sql,vs)

    def delete(self):
        print(self.get_primary())
        primary = self.get_primary()
        sql = "delete from %s where %s = %%s" % (self.__class__.__name__,primary)
        args = (self.get_primary_value(),)
        # print(sql)
        # print(args)
        # 需要的sql格式:
        # delete from History where id = %s
        return pool.execute_sql(sql,args)[0]

    def update(self):
        values = ""
        vs = []
        for k,v in self.__dict__.items():
            values += " %s = %%s," % k
            vs.append(v)
        values = values[:-1]
        sql = "update %s set %s where %s = %%s" %(self.__class__.__name__,
                                                 values,self.get_primary(),)
        vs.append(self.get_primary_value())

               # 需要的sql 格式:
        # update History set  id = %s, downtime = %s, mname = %s where id = %s
        return pool.execute_sql(sql,vs)[0]


    @classmethod
    def get_objects(cls,**kwargs):
        # 判断是否有条件
        if kwargs:
            sql = "select *from %s where " % cls.__name__
        else:
            sql = "select *from %s" % cls.__name__

        for k,v in kwargs.items():
            sql += "%s = %%s and "  % k
        sql = sql.rstrip("and ")

        count,res = pool.execute_sql(sql,list(kwargs.values()))

        objs = []
        # 将查询结果转为对象
        for u in res:
            obj = cls()
            # 直接覆盖对象的__dict__
            obj.__dict__.update(u)
            objs.append(obj)
        return objs

        # 需要的格式 "select *from 表名 where name = %s and 字段 = 值"

    @classmethod
    def get_object(cls, **kwargs):
        res = cls.get_objects(**kwargs)
        if res:
            return res[0]

    # 当查询条件非常复杂是 使用该函数来执行原生sql语句  自己处理结果
    @classmethod
    def execute_sql(cls,sql,args):
        return pool.execute_sql(sql,args)

    #获取主键的字段名
    def get_primary(self):
        for k,v in self.__class__.__dict__.items():
            if isinstance(v,IntegerField) and v.is_primary == True:
                return k

    # 获取主键的值
    def get_primary_value(self):
        return getattr(self,self.get_primary(),None)

###################
# OMR_settings.py #
###################

# 最大连接数量
MAX_CONNECT = 2

mysql_info = {
    "user": "root",
    "password": "123",
    "database": "youku",
    "host": "127.0.0.1"
}

##############
# ZS_POOL.py #
##############

import pymysql
from queue import Queue
from .ORM_settings import *
from threading import Lock

class ConnectPool:
    max = MAX_CONNECT
    # 记录创建的连接数量
    connect_count = 0
    # 创建一个容器用于存储连接
    queue = Queue(-1)
    # 线程锁
    lock = Lock()

    # 获取连接
    def __get_connect(self):
       #判断容器中是否有现成的
        if self.queue.qsize():
            return self.queue.get()
        else:
            # 以及创建的连接数量小于最大限度  那就创建
            self.lock.acquire()
            if ConnectPool.connect_count < self.max:
                c = self.__create_connect()
                self.queue.put(c)

            self.lock.release()
            return self.queue.get()

    def __return_connect(self,conn):
        self.queue.put(conn)

    # 创建新连接
    def __create_connect(self):
        conn = pymysql.connect(**mysql_info)
        ConnectPool.connect_count  += 1
        return conn

    def execute_sql(self,sql,args=None):
        conn = self.__get_connect()
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        count = cursor.execute(sql,args)
        conn.commit()
        res = cursor.fetchall()
        self.__return_connect(conn)
        return count,res

if __name__ == '__main__':
    pool = ConnectPool()
    # pool = ConnectPool()
    # res = pool.execute_sql("select *from t1")
    # print(res)
    # #
    # res = pool.execute_sql("select *from t1")
    # print(res)
    # from threading import Thread
    #
    # def task():
    #     pool = ConnectPool()
    #     res = pool.execute_sql("select *from t1")
    #
    # Thread(target=task).start()
    # Thread(target=task).start()
    # Thread(target=task).start()

interfaces目录

######################
# admin_interface.py #
######################


import datetime
import os,shutil
from db.models import *
from lib.file_tool import get_md5
from conf.settings import *

def login(req):
    name = req["name"]
    pwd = req["pwd"]
    #判断用户名密码是否正确
    user_obj = User.get_object(name=name)
    if user_obj and user_obj.pwd == pwd:
        if user_obj.islock == 1:
            return {"msg": "登录失败! 账户被锁定!", "status": False}
        return  {"msg":"登录成功","status":True,"usertype":user_obj.utype,"id":user_obj.id}
    return {"msg": "登录失败! 用户名或密码错误!", "status": False}


def register(req):
    user_obj = User.get_object(name=req["name"])
    if user_obj:
        return {"msg":"用户已经存在!","status":False}

    new_user = User()
    new_user.name = req["name"]
    new_user.pwd = req["pwd"]
    new_user.save()
    return {"msg":"用户注册成功!","status":True}


def upload(req):
    print(req)
    # 将视频信息插入到数据库
    obj = Movie.get_object(name = req["filename"])
    if obj:
        # 判断MD5值
        md5 = get_md5(obj.path)
        if md5 == req["md5"]:
            return {"msg":"该视频已经存在了!","status":False}
    #将文件从临时目录移动到真实目录
    real_path = os.path.join(VIDEOS_PATH,req["filename"])
    shutil.move(req["file_path"],real_path)
    # 插入数据库信息
    m = Movie()
    m.name = req["filename"]
    m.path = real_path
    m.up_time = str(datetime.datetime.now())
    m.u_id = req["uid"]
    m.is_free = req["is_free"]

    m.save()
    return {"msg":"上传成功!","status":True}


def get_movie_list(req):
    objs = Movie.get_objects(is_delete=0)
    movies = []
    for obj in objs:
        movies.append({"id":obj.id,"name":obj.name})
    return {"movies":movies}

def delete(req):
    id = req["id"]
    obj = Movie.get_object(id=id)
    obj.is_delete = 1
    obj.update()

    return {"msg":"删除成功!","status":True}

def publish_notice(req):
    notice = Notice()
    notice.title = req["title"]
    notice.content = req["content"]
    notice.u_id = req["uid"]
    notice.pub_time = str(datetime.datetime.now())
    notice.save()
    return  {"msg":"发布成功!","status":True}

def get_users(req):
    objs = User.get_objects(utype=0)
    users = []
    for obj in objs:
        users.append({
            "id":obj.id,
            "name":obj.name,
            "is_lock":obj.islock
        })
    return {"users":users,"status":True}

def lock(req):
    id = req["id"]
    is_lock = req["is_lock"]
    u = User.get_object(id=id)
    u.islock = is_lock
    u.update()

    return {"msg":"操作成功!","status":True}

lib目录

webServer目录

###########
# core.py #
###########

import socket, json, struct, logging
import traceback
from .server_settings import *
from concurrent.futures import ThreadPoolExecutor
from lib import file_tool


class Service:
    # 在初始化函数中来创建socket并监听端口
    def __init__(self):
        server_soc = socket.socket()
        server_soc.bind((IP, PORT))
        server_soc.listen()
        self.server_soc = server_soc
        self.pool = ThreadPoolExecutor(2000)
        self.__get_logger()
        self.logger.info("server listing.....")

    def __get_logger(self):
        logger = logging.getLogger("server")
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setLevel(logging.INFO)
        handler.setFormatter(logging.Formatter("[%(asctime)s %(message)s]"))
        logger.addHandler(handler)
        self.logger = logger

    def __client_handle(self, client):
        """处理客户端"""
        try:
            while True:
                request = self.__recv_request(client)
                func_name = request.get("func")  # 取出要执行的函数名称
                # 如果客户端没有穿func参数则返回错误信息
                if  not func_name:
                    self.__send_response(client, {"code":500,"mgs":"error:参数必须包含func.."})
                    continue

                #判断是否是上传请求
                if func_name == "upload":
                    self.__recv_file(request,client)

                if func_name in mapping:  # 判断函数是否存在
                    res = mapping[func_name](request)  # 取出函数执行

                    res["code"] = "200"  # 标识请求是成功的! 但是并不意味业务逻辑处理成功
                else:
                    res = {"code": "500", "msg": "没有这个功能!"}

                # 判断是否是文件下载请求
                file_path = res.get("download_path")
                if file_path: # 是则发送文件
                    self.__send__file(client,file_path)
                else:
                    #不是则直接返回响应结果
                    self.__send_response(client, res)

        except Exception as e:
            self.logger.info("%s: disconnect because: %s" % (client.getpeername(),e))
            client.close()
            traceback.print_exc()

    def __recv_request(self, client):
        """接收请求数据"""
        len_bytes = client.recv(4)
        head_len = struct.unpack("i", len_bytes)[0]

        data_bytes = client.recv(head_len)
        json_data = json.loads(data_bytes.decode("utf-8"))

        return json_data

    def __send_response(self, client, res):
        json_bytes = json.dumps(res).encode("utf-8")
        len_bytes = struct.pack("i", len(json_bytes))

        client.send(len_bytes)
        client.send(json_bytes)

    def listen_client(self):
        while True:
            c_soc, _ = self.server_soc.accept()
            self.pool.submit(self.__client_handle, c_soc)
            self.logger.info("a client connected.....")

    # 接收文件的函数
    def __recv_file(self,req,client):
        file_path = os.path.join(TEMP_PATH,req["filename"])
        if not os.path.exists(TEMP_PATH):
            os.makedirs(TEMP_PATH)

        recv_size = 0
        total_size = req["filesize"]
        buffer_size = 1024
        f = open(file_path,"wb")
        while recv_size < total_size:
            # 如果未接收大于缓冲区
            if total_size - recv_size > buffer_size:
                data = client.recv(buffer_size)
            else:
                # 未接收小于缓冲区
                data = client.recv(total_size-recv_size)
            f.write(data)
            recv_size += len(data)
        f.close()
        self.logger.info("%s上传完毕 path: %s " % (req["filename"],file_path))
        req["file_path"] = file_path

    def __send__file(self,client,path):
        info = {"filename": os.path.basename(path), "filesize": os.path.getsize(path),
                "MD5": file_tool.get_md5(path)}

        # 发送文件信息
        self.__send_response(client, info)

        # 发送文件数据
        f = open(path, "rb")
        while True:
            data = f.read(1024)
            if not data:
                break
            client.send(data)
        self.logger.info("%s 传输完毕!" % info["filename"])

###############
# settings.py #
###############

import os
from interfaces import admin_interface,user_interface

#监听端口
PORT = 9999
# 服务器IP
IP = "127.0.0.1"
# 最大线程数量
MAX_CONNECT= 2000


#业务逻辑函数 与 key的对应关系
mapping = {
}
mapping.update(admin_interface.__dict__)
mapping.update(user_interface.__dict__)




BASE_PATH = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))

# 临时文件的目录
TEMP_PATH = os.path.join(BASE_PATH,"tmp")



################
# file_tool.py #
################

import hashlib

salt = "com.oldboy.www"


def get_md5(path):
    md5 = hashlib.md5()
    f = open(path,"rb")
    while True:
        data = f.read(1024)
        if not data:
            break
        md5.update(data)
    f.close()
    return md5.hexdigest()


def get_md5_password(password):
    md5 = hashlib.md5()
    md5.update(password.encode("UTF-8"))
    md5.update(salt.encode("GBK"))
    return md5.hexdigest()

tmp

##############
# settingspy #
##############

#监听端口
PORT = 9999
# 服务器IP
IP = "127.0.0.1"
###########
# user.py #
###########


from lib import file_tool
from .client_socket import *
def register():
    name = input("用户名:").strip()
    pwd = input("密码:").strip()

    req = {"func": "register", "name": name, "pwd": file_tool.get_md5_password(pwd)}

    if name and pwd:
        client.send_request(req)
        resp = client.recv_response()
        print(resp)
    else:
        print("用户名或密码不能为空。。。")

"""
1、登录(成功后立即展示最新的公告) 2、注册 3、查看所有视频
		4、下载视频 5、查看公告 6、查看观影记录,7开通会员,8充值
"""

def show_movies():
    pass

def download_movie():
    pass

def show_notices():
    pass

def download_history():
    pass

def open_vip():
    pass

def recharge():
    pass


funcs = {"1":show_movies,"2":download_movie,"3":show_notices,"4":download_history,"5":open_vip,"6":recharge}

def views(id):
    global current_id
    current_id = id
    print("欢迎进入优酷系统....")
    while True:
        res= input("请选择功能:
1.查看视频
2.下载视频
3.查看公告
4.下载历史
5.开通会员
6.充值
q.返回").strip()
        if res == "q":
            break
        if res in funcs:
            funcs[res]()
        else:
            print("输入错误!")
    print("返回上一级...")

videos目录

############
# admin.py #
############

"""
3、上传视频 4、删除视频 5、发布公告, 6.用户管理
"""
import os
from lib.file_tool import get_md5
from core.client_socket import client



def upload_movie():
    path = input("请输入文件路径:
").strip()
    if os.path.exists(path) and os.path.isfile(path):
        file_info = {"func": "upload",
                     "filename": os.path.basename(path),
                     "filesize": os.path.getsize(path),
                     "md5": get_md5(path),
                     "uid":current_id}


        free = input("是否收费(y:收费)").strip()
        if free == "y":
            file_info["is_free"] = "1"
        else:
            file_info["is_free"] = "0"


        resp = client.send_file(path,file_info)
        print(resp)
    else:
        print("路径错误,必须存在的文件!")


def delete_movie():
    pass

def publish_notice():
    pass


def user_manage():
    pass


funcs = {"1":upload_movie,"2":delete_movie,"3":publish_notice,"4":user_manage}

def views(id):
    global current_id
    current_id = id
    print("尊敬的管理员,欢迎登陆...")
    while True:
        res = input("请选择功能:
1.上传视频
2.删除视频
3.发布公告
4.用户管理
q.返回").strip()
        if res == "q":
            break
        if res in funcs:
            funcs[res]()
        else:
            print("输入错误!")
    print("返回上一级.....")
###############
# settings.py #
###############

#监听端口
PORT = 9999
# 服务器IP
IP = "127.0.0.1"
原文地址:https://www.cnblogs.com/bladecheng/p/11236057.html