ATM项目

项目

#一个项目是怎么从无到有的

#1.需求分析
	额度15000或者自定义		--> 注册功能
    实现购物商城,买东西自动加入购物车,调用信用卡接口结账		--> 购物车、支付功能
    可以提现,支付手续费5%	--> 提现
    支持多账户登录		--> 登录
    支持账户间转账		--> 转账,记录流水
    提供还款接口		--> 还款
    ATM记录操作日志	--> 日志功能
    提供管理接口,包括添加账户、用户额度、冻结账户...		--> 管理员功能
    用户认证功能		--> 登录认证,使用装饰器

#2.程序的架构设计
	用户功能层,接收用户输入,输出执行结果,还可以做小的逻辑判断
    接口层,业务逻辑的处理、日志、流水
    数据处理层,数据的处理(增删改查)
    
    #三层架构的好处
    	1.代码的结构更加清晰
        2.可扩展性强
        3.易于维护、管理

#3.分任务开发
	- CTO
    	- 技术总监
        	- 架构师
            	- 项目经理
                	- 普通开发
                    - UI:用户界面设计师
                    - 前端:网页的开发
                    - 后端:写业务逻辑、写接口
                    - 测试:测试软件
                    - 运维:部署项目
                    
#4.测试
	- 1.手动测试
    	传统人工去手动测试
    - 2.自动化测试
    	通过脚本模拟人的行为,自动化执行测试
    - 3.黑盒测试
    	对用户界面进行测试 
    - 4.白盒测试
    	对软件的性能进行测试,例如每分钟能接收多少并发量
        
#5.上线运行

实例

参考网站

启动文件

#创建项目目录ATM
#ATM/bin/start.py

import sys
import os
BASE_dir = os.path.dirname(os.path.dirname(__file__))
sys.path.append(BASE_dir)
from core import src
if __name__ == '__main__':
    src.run()

配置文件

#ATM/conf/settings.py

import os
base_path = os.path.dirname(os.path.dirname(__file__))
db_path = os.path.join(base_path, 'db')
#不要使用%s拼接路径,要使用os模块
LOG_path = os.path.join(base_path,'log')

"""
logging配置
"""

# 定义三种日志输出格式 开始
standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]' 
                  '[%(levelname)s][%(message)s]'  # 其中name为getlogger指定的名字
simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'
id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s'

# 定义日志输出格式 结束
logfile_dir = os.path.join(base_path, 'log')  # log文件的目录
logfile_name = 'atm_shop_log.log'  # log文件名

# 如果不存在定义的日志目录就创建一个
if not os.path.isdir(logfile_dir):
    os.mkdir(logfile_dir)

# log文件的全路径
logfile_path = os.path.join(logfile_dir, logfile_name)

# log配置字典
LOGGING_DIC = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'standard': {
            'format': standard_format
        },
        'simple': {
            'format': simple_format
        },
    },
    'filters': {},
    'handlers': {
        # 打印到终端的日志
        'console': {
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',  # 打印到屏幕
            'formatter': 'simple'
        },
        # 打印到文件的日志,收集info及以上的日志
        'default': {
            'level': 'DEBUG',
            'class': 'logging.handlers.RotatingFileHandler',  # 保存到文件
            'formatter': 'standard',
            'filename': logfile_path,  # 日志文件
            'maxBytes': 1024 * 1024 * 5,  # 日志大小5M
            'backupCount': 5,
            'encoding': 'utf-8',  # 日志文件的编码,再也不用担心中文log乱码了
        },
    },
    'loggers': {
        # logging.getLogger(__name__)拿到的logger配置
        '': {
            'handlers': ['default', 'console'],  # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
            'level': 'DEBUG',
            'propagate': True,  # 向上(更高level的logger)传递
        },
    },
}

核心文件

#ATM/core/src.py

import time
from ATM.interface import user_interface
from ATM.lib import comman
from ATM.interface import bank_interface
from ATM.interface import shop_interface
from ATM.interface import admin_interface

user_info = {
    'user':None,
}
#注册
def register():
    while True:
        user = input('请输入用户名>>>: ').strip()
        # 调用接口,判断用户是否存在
        flag = user_interface.check_user_interface(user)
        if flag:
            print('用户已存在,请重新输入!')
            continue
        pwd = input('请输入密码>>>: ').strip()
        re_pwd = input('请确认密码>>>: ').strip()
        if pwd == re_pwd:
            #调用接口,保存用户信息
            msg = user_interface.register_interface(user,pwd)
            if msg:
                print(msg)
                break
            else:
                print('注册失败')
        else:
            print('两次输入密码不一致')
#登录
def login():
    while True:
        user = input('请输入用户名>>>: ').strip()
        flag = user_interface.check_user_interface(user)
        if not flag:
            print('用户不存在,请重新输入!')
            continue
        pwd = input('请输入用户密码>>>: ').strip()
        flag,msg = user_interface.login_interface(user,pwd)
        if flag:
            print(msg)
            #登录成功后,做一个记录
            user_info['user'] = user
            break
        else:
            print(msg)
            break
#查看余额
@comman.login_auth
def check_balance():
    balabce = user_interface.check_balance_interface(user_info['user'])
    msg = f"{user_info['user']}余额为: "
    print(msg,balabce)
#提现
@comman.login_auth
def withdraw():
    while True:
        money = input('请输入提现金额>>>: ').strip()
        if money.isdigit():
            money = int(money)
            flag,msg = bank_interface.withdraw_interface(user_info['user'],money)
            if flag:
                print(msg)
                break
            else:
                print(msg)
        else:
            print('请输入数字')
#还款
@comman.login_auth
def pay_bak():
    while True:
        money = input('请输入还款金额>>>: ').strip()
        if not money.isdigit():
            print('请输入数字!')
            continue
        money = int(money)
        flag,msg = bank_interface.repay_interface(user_info['user'],money)
        if flag:
            print(msg)
            break
#转账
@comman.login_auth
def transfer():
    while True:
        to_user = input('请输入转账用户>>>: ').strip()
        flag = user_interface.check_user_interface(to_user)
        if not flag:
            print('目标用户不存在!')
            continue
        #输入转账金额
        money = input('请输入转账金额>>>: ').strip()
        if not money.isdigit():
            print('请输入数字')
            continue
        money = int(money)
        flag,msg = bank_interface.transfer_interface(to_user,user_info['user'],money)
        if flag:
            print(msg)
            break
        else:
            print(msg)
#查看流水
@comman.login_auth
def check_record():
    flow_list = bank_interface.check_flow_interface(user_info['user'])
    for flow in flow_list:
        print(flow)
#购物车功能
@comman.login_auth
def shopping_cart():
    good_list = [
        ['衬衫', 500],
        ['裤子', 300],
        ['鞋子', 150],
        ['外套', 1000]
    ]
    #用户余额
    user_balance = user_interface.check_balance_interface(user_info['user'])
    #购物车
    shop_cart = {}
    #清空购物车,用户需要支付的金额
    cost = 0
    while True:
        # 打印商品信息
        for index, good_price in enumerate(good_list, 1):
            print(index, good_price)
        # 选择商品编号或者输入q退出
        choice = input('请输入商品编号或输入q退出>>>: ').strip()
        if choice.isdigit():
            choice = int(choice)
            if choice >= 0 and choice <= len(good_list):
                good_name, good_price = good_list[choice - 1]
                if user_balance >= good_price:
                    if good_name in shop_cart:
                        shop_cart[good_name] += 1
                    else:
                        shop_cart[good_name] = 1
                    cost += good_price
                else:
                    print('余额不足,无法添加购物车!')
            else:
                print('商品编号不存在!')
        elif choice == 'q':
            commit = input('是否确认结账,请输入(y/n)').strip()
            if commit == 'y':
                # 调用商城支付功能,并调用银行支付接口
                flag, msg = shop_interface.buy_shop_interface(user_info['user'], cost)
                if flag:
                    print(msg)
                    break
                else:
                    print(msg)
            elif commit == 'n':
                # 调用添加购物车接口
                flag, msg = shop_interface.add_shop_cart_interface(user_info['user'], shop_cart)
                if flag:
                    print(msg)
                    break
                else:
                    print(msg)
        else:
            print('请输入商品编号或输入q')
#查看购物车
@comman.login_auth
def check_shop_cart():
    shop_cart = shop_interface.check_shop_cart_interface(user_info['user'])
    if not shop_cart:
        print('购物车为空')
    print(shop_cart)
#注销
@comman.login_auth
def logout():
    while True:
        flag,msg = user_interface.logout_interface(user_info['user'])
        if flag:
            print(msg)
            break
#管理员功能
def lock_user():
    while True:
        user = input('请输入需要冻结的账户>>>: ').strip()
        flag = user_interface.check_user_interface(user)
        if flag:
            msg = admin_interface.lock_interface(user)
            print(msg)
            break
        else:
            print('用户不存在,请重新输入!')
def unlock_user():
    while True:
        user = input('请输入需要解冻的账户>>>: ').strip()
        flag = user_interface.check_user_interface(user)
        if flag:
            msg = admin_interface.unlock_interface(user)
            print(msg)
            break
        else:
            print('用户不存在,请重新输入!')
def change_limit():
    while True:
        user = input('请输入需要修改额度的账户>>>: ').strip()
        flag = user_interface.check_user_interface(user)
        limit = input('请输入修改额度>>>: ').strip()
        if flag:
            if limit.isdigit():
                limit = input(limit)
                #调用修改额度接口
                msg = admin_interface.change_limit_interface(user,limit)
                print(msg)
                break
            else:
                print('输入额度无效,请重新输入')
                continue
        else:
            print('用户不存在,请重新输入!')
admin_func_dic = {
    '1':lock_user,
    '2':unlock_user,
    '3':change_limit,
}
@comman.login_auth
def admin_sys():
    while True:
        print('''
            1.冻结账户
            2.解冻账户
            3.用户额度
        ''')
        choice = input('请选择管理员功能>>>: ').strip()
        if choice.isdigit():
            if choice in admin_func_dic:
                admin_func_dic[choice]()
                break
            else:
                print('相关功能还在开发中...')
                continue
        else:
            print('输入无效,请重新输入!')
my_dict = {
    '1':register,
    '2':login,
    '3':check_balance,
    '4':withdraw,
    '5':pay_bak,
    '6':transfer,
    '7':check_record,
    '8':shopping_cart,
    '9':check_shop_cart,
    '10':logout,
    '11':admin_sys
}
def run():
    while True:
        print("""
        1  register(注册)
        2  login(登录)
        3  check_balance(查看余额)
        4  withdraw(提现)
        5  pay_bak(还款)
        6  transfer(转账)
        7  check_record(查看流水)
        8  shopping_cart(购物车功能)
        9  check_shop_cart(查看购物车)
        10 logout(注销)
        11 admin_sys(管理员功能)
        """)
        choice = input('please choice func to operate>>>: ').strip()
        if choice == 'q':
            break
        if choice not in my_dict:
            print('相关功能还在开发中')
            time.sleep(1)
            continue
        my_dict.get(choice)()

文件数据操作

# ATM/db/db_handler.py 

import json
import os
from ATM.conf import settings
#保存用户数据
def save(user_dic):
    user_path = f'{settings.db_path}/{user_dic["user"]}.json'
    with open(user_path, 'w', encoding='utf-8') as f:
        #中文不转码
        json.dump(user_dic, f,ensure_ascii=False)
        f.flush()
#查看用户数据
def select(user):
    user_path = f'{settings.db_path}/{user}.json'
    #判断用户文件是否存在
    if os.path.exists(user_path):
        with open(user_path,'r',encoding='utf-8') as f:
            user_dic = json.load(f)
            return user_dic

接口文件

用户相关接口

# ATM/interface/user_interface

from ATM.db import db_handler
from ATM.lib import comman
import os

user_logger = comman.get_logger('user')
#注册接口
def register_interface(user,pwd,balance=15000):
    pwd = comman.get_md5(pwd)
    user_dic = {
        'user': user,
        'pwd': pwd,
        'balance': balance,
        'flow':[],
        'shop_cart':{},
        'lock':False
    }
    db_handler.save(user_dic)
    user_logger.info(f'{user_dic["user"]}用户注册成功!')
    return f'{user_dic["user"]}用户注册成功!'
#查看用户接口
def check_user_interface(user):
    #调用数据处理层的select函数,查看用户信息
    user_dic = db_handler.select(user)
    if user_dic:
        return True
#登录接口
def login_interface(user,pwd):
    pwd = comman.get_md5(pwd)
    user_dic = db_handler.select(user)
    if user_dic['lock']:
        user_logger.info(f'用户{user}已经被冻结!')
        return False,f'用户{user}已经被冻结!'
    if user_dic['pwd'] == pwd:
        user_logger.info(f'用户{user}登录成功')
        return True,f'用户{user}登录成功'
    user_logger.info(f'用户{user}登录失败,密码错误')
    return False,f'用户{user}登录失败,密码错误'
#查看余额接口
def check_balance_interface(user):
    user_dic = db_handler.select(user)
    return user_dic['balance']
#注销接口
def logout_interface(user):
    from ATM.core import src
    src.user_info['user'] = None
    user_logger.info(f'用户{user}注销成功!欢迎下次使用')
    return True,f'用户{user}注销成功!欢迎下次使用'

商店相关接口

from ATM.db import db_handler
from ATM.interface import bank_interface
from ATM.lib import comman

user_logger = comman.get_logger('shop')
#商城结账接口
def buy_shop_interface(user,cost):
    flag,msg = bank_interface.pay_interface(user,cost)
    if flag:
        user_logger.info(f'用户{user}购物成功,余额为{msg}')
        return True,f'购物成功,余额为{msg}'
    else:
        user_logger.info(f'用户{user}余额为{msg},余额不足,购物失败!')
        return False,f'余额为{msg},余额不足,购物失败!'
#添加购物车接口
def add_shop_cart_interface(user,shop_cart):
    user_dic = db_handler.select(user)
    old_cart = user_dic['shop_cart']
    #循环当前购物车
    for shop in shop_cart:
        if shop in old_cart:
            #把回购的商品信息添加到原来的购物车
            old_cart[shop] += shop_cart[shop]
        else:
            #把新商品添加到购物车
            old_cart[shop] = shop_cart[shop]
    user_dic['shop_cart'].update(old_cart)
    db_handler.save(user_dic)
    user_logger.info(f'用户{user}添加购物车成功')
    return True,'添加购物车成功'
#查看购物车接口
def check_shop_cart_interface(user):
    user_dic = db_handler.select(user)
    user_logger.info(f'用户{user}查看购物车')
    return user_dic['shop_cart']

银行相关接口

from ATM.db import db_handler
from ATM.lib import comman

user_logger = comman.get_logger('bank')
#提现接口
def withdraw_interface(user,money):
    user_dic = db_handler.select(user)
    if user_dic['balance'] >= money*1.05:
        user_dic['balance']-=money*1.05
        msg = f'用户{user}提现[{money}]元成功!'
        user_dic['flow'].append(msg)
        db_handler.save(user_dic)
        user_logger.info(f'用户{user}提现[{money}]元成功!')
        return True,msg
    user_logger.info(f'用户{user}提现失败,余额不足...')
    return False,'余额不足...'
#还款接口
def repay_interface(user,money):
    user_dic = db_handler.select(user)
    user_dic['balance'] +=money
    msg = f'用户{user},还款{money}成功'
    user_dic['flow'].append(msg)
    db_handler.save(user_dic)
    user_logger.info(f'用户{user},还款{money}成功')
    return True,msg
#转账接口
def transfer_interface(to_user,from_user,money):
    to_user_dic = db_handler.select(to_user)
    from_user_dic = db_handler.select(from_user)
    if from_user_dic['balance'] >=money:
        #金额操作
        from_user_dic['balance'] -=money
        to_user_dic['balance'] +=money
        #拼接流水
        to_user_flow = f'{to_user}用户接受{from_user}用户转账{money}元'
        from_user_flow = f'{from_user}用户给{to_user}用户转账{money}元'
        #记录流水
        from_user_dic['flow'].append(from_user_flow)
        to_user_dic['flow'].append(to_user_flow)
        #保存用户余额信息
        db_handler.save(from_user_dic)
        db_handler.save(to_user_dic)
        return True,from_user_flow
    user_logger.info(f'用户{from_user}转账失败,余额不足,请充值!')
    return False,'余额不足,请充值!'
#查看流水接口
def check_flow_interface(user):
    user_dic = db_handler.select(user)
    user_logger.info(f'用户{user}查看了银行流水')
    return user_dic['flow']
#银行支付接口
def pay_interface(user,cost):
    user_dic = db_handler.select(user)
    if user_dic['balance'] >= cost:
        user_dic['balance'] -=cost
        user_dic['flow'].append(f'{user}用户支付{cost}成功')
        db_handler.save(user_dic)
        user_logger.info(f'用户{user}支付{cost}成功')
        return True,user_dic['balance']
    else:
        user_logger.info(f'用户{user}支付{cost}失败')
        return False,user_dic['balance']
if __name__ == '__main__':
    res = pay_interface('syy',1)
    print(res)

管理员相关接口

from ATM.db import db_handler
from ATM.lib import comman

user_logger = comman.get_logger('admin')
#冻结账户接口
def lock_interface(user):
    user_dic = db_handler.select(user)
    user_dic['lock'] = True
    db_handler.save(user_dic)
    user_logger.info(f'用户{user}冻结成功')
    return f'用户{user}冻结成功'
#解冻账户接口
def unlock_interface(user):
    user_dic = db_handler.select(user)
    user_dic['lock'] = False
    db_handler.save(user_dic)
    user_logger.info(f'用户{user}解冻成功')
    return f'用户{user}解冻成功'
#修改余额接口
def change_limit_interface(user,limit):
    user_dic = db_handler.select(user)
    user_dic['balance'] = limit
    db_handler.save(user_dic)
    user_logger.info(f'用户{user}修改余额成功')
    return f'用户{user}修改额度成功'

通用文件

#ATM/lib/comman.py

import hashlib
import logging.config
from ATM.conf import settings

#登录认证
def login_auth(func):
    from core import src
    def inner(*args,**kwargs):
        #判断用户是否登录,是则执行被装饰函数,否则要求登录
        if src.user_info['user']:
            res = func(*args,**kwargs)
            return res
        else:
            print('请先登录...')
            src.login()
    return inner

#md5加密
def get_md5(pwd):
    val = '加盐'
    md5 = hashlib.md5()
    md5.update(val.encode('utf-8'))
    md5.update(pwd.encode('utf-8'))
    res = md5.hexdigest()
    return res

#日志功能
def get_logger(type_name):
    logging.config.dictConfig(settings.LOGGING_DIC)
    logger = logging.getLogger(type_name)
    return logger

if __name__ == '__main__':
    logger = get_logger('syy')
    logger.info('哈哈哈哈哈哈哈')
原文地址:https://www.cnblogs.com/syy1757528181/p/14059494.html