ATM + 购物车

ATM + 购物车


conf存放配置文件

import os
BASE_PATH = os.path.dirname(os.path.dirname(__file__))
# 拼接db目录路径
DB_PATH = os.path.join(BASE_PATH, 'db')

"""
logging配置
"""

# 定义三种日志输出格式 开始
standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]'
                 '[%(levelname)s][%(message)s]'  # 其中name为getlogger指定的名字

simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'

id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s'

# 定义日志输出格式 结束
logfile_dir = os.path.join(BASE_PATH, 'log')  # log文件的目录

logfile_name = 'atm_shop_log.log'  # log文件名

# 如果不存在定义的日志目录就创建一个
if not os.path.isdir(logfile_dir):
   os.mkdir(logfile_dir)

# log文件的全路径
logfile_path = os.path.join(logfile_dir, logfile_name)

# log配置字典
LOGGING_DIC = {
   'version': 1,
   'disable_existing_loggers': False,
   'formatters': {
       'standard': {
           'format': standard_format
      },
       'simple': {
           'format': simple_format
      },
  },
   'filters': {},
   'handlers': {
       # 打印到终端的日志
       'console': {
           'level': 'DEBUG',
           'class': 'logging.StreamHandler',  # 打印到屏幕
           'formatter': 'simple'
      },
       # 打印到文件的日志,收集info及以上的日志
       'default': {
           'level': 'DEBUG',
           'class': 'logging.handlers.RotatingFileHandler',  # 保存到文件
           'formatter': 'standard',
           'filename': logfile_path,  # 日志文件
           'maxBytes': 1024 * 1024 * 5,  # 日志大小5M
           'backupCount': 5,
           'encoding': 'utf-8',  # 日志文件的编码,再也不用担心中文log乱码了
      },
  },
   'loggers': {
       # logging.getLogger(__name__)拿到的logger配置
       '': {
           'handlers': ['default', 'console'],  # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
           'level': 'DEBUG',
           'propagate': True,  # 向上(更高level的logger)传递
      },
  },
}

core核心业务


 

import os
BASE_PATH = os.path.dirname(os.path.dirname(__file__))
# 拼接db目录路径
DB_PATH = os.path.join(BASE_PATH, 'db')

"""
logging配置
"""

# 定义三种日志输出格式 开始
standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]'
                 '[%(levelname)s][%(message)s]'  # 其中name为getlogger指定的名字

simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'

id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s'

# 定义日志输出格式 结束
logfile_dir = os.path.join(BASE_PATH, 'log')  # log文件的目录

logfile_name = 'atm_shop_log.log'  # log文件名

# 如果不存在定义的日志目录就创建一个
if not os.path.isdir(logfile_dir):
   os.mkdir(logfile_dir)

# log文件的全路径
logfile_path = os.path.join(logfile_dir, logfile_name)

# log配置字典
LOGGING_DIC = {
   'version': 1,
   'disable_existing_loggers': False,
   'formatters': {
       'standard': {
           'format': standard_format
      },
       'simple': {
           'format': simple_format
      },
  },
   'filters': {},
   'handlers': {
       # 打印到终端的日志
       'console': {
           'level': 'DEBUG',
           'class': 'logging.StreamHandler',  # 打印到屏幕
           'formatter': 'simple'
      },
       # 打印到文件的日志,收集info及以上的日志
       'default': {
           'level': 'DEBUG',
           'class': 'logging.handlers.RotatingFileHandler',  # 保存到文件
           'formatter': 'standard',
           'filename': logfile_path,  # 日志文件
           'maxBytes': 1024 * 1024 * 5,  # 日志大小5M
           'backupCount': 5,
           'encoding': 'utf-8',  # 日志文件的编码,再也不用担心中文log乱码了
      },
  },
   'loggers': {
       # logging.getLogger(__name__)拿到的logger配置
       '': {
           'handlers': ['default', 'console'],  # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
           'level': 'DEBUG',
           'propagate': True,  # 向上(更高level的logger)传递
      },
  },
}

db数据处理


 

import os
import json
from conf import settings


# 保存数据
def save(user_dic):

   # 拼接用户保存文件
   user_path = f'{settings.DB_PATH}/{user_dic["user"]}.json'

   # 把用户信息写入文件中
   with open(user_path, 'w', encoding='utf-8') as f:
       json.dump(user_dic, f, ensure_ascii=False)
       f.flush()


# 查看用户数据
def select(user):
   # base_path = os.path.dirname(os.path.dirname(__file__))
   # db_path = os.path.join(base_path, 'db')

   user_path = f'{settings.DB_PATH}/{user}.json'

   # 判断用户文件是否存在
   if os.path.exists(user_path):

       # 若存在, 读取用户信息
       with open(user_path, 'r', encoding='utf-8') as f:

           user_dic = json.load(f)

           return user_dic

interface接口逻辑处理


import os
import json
from conf import settings


# 保存数据
def save(user_dic):

   # 拼接用户保存文件
   user_path = f'{settings.DB_PATH}/{user_dic["user"]}.json'

   # 把用户信息写入文件中
   with open(user_path, 'w', encoding='utf-8') as f:
       json.dump(user_dic, f, ensure_ascii=False)
       f.flush()


# 查看用户数据
def select(user):
   # base_path = os.path.dirname(os.path.dirname(__file__))
   # db_path = os.path.join(base_path, 'db')

   user_path = f'{settings.DB_PATH}/{user}.json'

   # 判断用户文件是否存在
   if os.path.exists(user_path):

       # 若存在, 读取用户信息
       with open(user_path, 'r', encoding='utf-8') as f:

           user_dic = json.load(f)

           return user_dic

 

管理员接口

from db import db_handler
from lib import common

admin_logger = common.get_logger('admin')

# 冻结用户接口
def lock_interface(user):
   user_dic = db_handler.select(user)

   user_dic['lock'] = True

   db_handler.save(user_dic)
   return f'{user}用户冻结成功!'

def unlock_interface(user):
   user_dic = db_handler.select(user)

   user_dic['lock'] = False

   db_handler.save(user_dic)
   return f'{user}用户解冻成功!'


# 修改额度接口
def change_limit_interface(user, limit):
   user_dic = db_handler.select(user)
   user_dic['balance'] = limit
   db_handler.save(user_dic)
   return f'修改用户[{user}]额度成功!'

银行接口

from db import db_handler
from lib import common

bank_logger = common.get_logger('bank')

# 提现接口
def withdraw_interface(user, money):

   user_dic = db_handler.select(user)

   if user_dic['balance'] >= money * 1.05:

       user_dic['balance'] -= money * 1.05

       msg = f'用户[{user}] 提现[{money}]元成功!'

       user_dic['flow'].append(msg)

       db_handler.save(user_dic)

       return True, msg

   return False, '余额不足!'


# 还款接口
def repay_interface(user, money):
   user_dic = db_handler.select(user)

   user_dic['balance'] += money

   msg = f'用户{user}, 还款{money}元成功!'
   user_dic['flow'].append(msg)

   db_handler.save(user_dic)

   return True, msg


# 转账接口
def transfer_interface(to_user, from_user, money):

   to_user_dic = db_handler.select(to_user)

   from_user_dic = db_handler.select(from_user)

   if from_user_dic['balance'] >= money:

       # 加减钱操作
       from_user_dic['balance'] -= money

       to_user_dic['balance'] += money

       # 拼接流水信息
       to_user_flow = f'{to_user}用户接收到用户{from_user}转账{money}元'
       from_user_flow = f'{from_user}用户给到用户{to_user}转账{money}元'

       # 记录流水
       from_user_dic['flow'].append(from_user_flow)
       to_user_dic['flow'].append(to_user_flow)

       # 保存用户信息
       db_handler.save(from_user_dic)
       db_handler.save(to_user_dic)

       return True, from_user_flow

   return False, '余额不足,请充值!'


# 查看流水接口
def check_flow_interface(user):
   user_dic = db_handler.select(user)

   return user_dic['flow']


# 银行支付接口
def pay_interface(user, cost):
   user_dic = db_handler.select(user)

   if user_dic['balance'] >= cost:

       user_dic['balance'] -= cost

       user_dic['flow'].append(f'{user}用户支付{cost}成功!')

       db_handler.save(user_dic)

       return True

   else:
       return False

购物车接口

from interface import bank_interface
from db import db_handler
from lib import common

shop_logger = common.get_logger('shop')

# 商城结账接口
def buy_shop_interface(user, cost):

   flag = bank_interface.pay_interface(user, cost)

   if flag:

       return True, '购物成功!'

   else:
       return False, '余额不足,支付失败!'


# 添加购物车接口
def add_shop_cart_interface(user, shop_cart):
   user_dic = db_handler.select(user)

   old_cart = user_dic['shop_cart']

   # 循环当前购物车
   for shop in shop_cart:

       if shop in old_cart:
           # 获取当前购物车中的商品数量
           number = shop_cart[shop]

           # 给用户信息中的商品数量做 增值运算
           old_cart[shop] += number

       else:
           # 获取当前购物车中的商品数量
           number = shop_cart[shop]

           old_cart[shop] = number

   user_dic['shop_cart'].update(old_cart)
   db_handler.save(user_dic)
   return True, '添加购物车成功!'


# 查看购物车接口'
def check_shop_cart_interface(user):
   user_dic = db_handler.select(user)

   return user_dic['shop_cart']

用户接口

from db import db_handler
from lib import common
from lib import common

user_logger = common.get_logger('user')

# 查看用户接口
def check_user_interface(user):

   # 调用数据处理层的select函数,查看用户信息
   user_dic = db_handler.select(user)

   if user_dic:
       return True


# 注册接口
def register_interface(user, pwd, balance=15000):

   pwd = common.get_md5(pwd)

   user_dic = {
       'user': user,
       'pwd': pwd,
       'balance': balance,
       'flow': [],
       'shop_cart': {},
       'lock': False
  }

   # 调用保存数据功能
   db_handler.save(user_dic)

   user_logger.info(f'{user_dic["user"]}用户注册成功!')

   return f'{user_dic["user"]}用户注册成功!'


# 登陆接口
def login_interface(user, pwd):

   user_dic = db_handler.select(user)

   pwd = common.get_md5(pwd)

   if user_dic['lock']:
       user_logger.info(f'用户[{user}]已被冻结,请联系管理员!')

       return False, '用户已被冻结,请联系管理员!'

   if user_dic['pwd'] == pwd:
       user_logger.info(f'{user}登陆成功!')
       return True, f'{user}登陆成功!'

   user_logger.info(f'{user}输入密码错误!')
   return False, f'{user}密码错误!'


# 查看余额接口
def check_bal_interface(user):
   user_dic = db_handler.select(user)
   return user_dic['balance']


# 注销接口
def logout_interface():
   from core import src
   user = src.user_info['user']
   src.user_info['user'] = None

   return True, f'{user}用户注销成功!欢迎官人下次再来!'



lib公共文件


import hashlib
import logging.config
from conf import settings

# MD5加密功能
def get_md5(pwd):
   val = '天王盖地虎'
   md5 = hashlib.md5()
   md5.update(pwd.encode('utf-8'))
   md5.update(val.encode('utf-8'))
   res = md5.hexdigest()
   return res


# 登陆认证功能
def login_auth(func):
   from core import src
   def inner(*args, **kwargs):
       # 判断用户存在, 则执行被装饰函数
       if src.user_info['user']:
           res = func(*args, **kwargs)

           return res

       # 否则,执行登陆功能
       else:
           print('请先登陆')
           src.login()

   return inner


# 日志功能
def get_logger(type_name):
   logging.config.dictConfig(settings.LOGGING_DIC)
   logger = logging.getLogger(type_name)
   return logger


if __name__ == '__main__':
   res = get_md5('123')
   print(res)
   logger = get_logger('tank')

log日志


start.py启动文件

import os
import sys

BASE_PATH = os.path.dirname(__file__)

sys.path.append(BASE_PATH)

from core import src

if __name__ == '__main__':
   src.run()

 

 

原文地址:https://www.cnblogs.com/zhangjinyi97/p/11929605.html