仿mysql命令行

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# by wk

import os


# 格式化输出
def format_print(seled_columns, print_list):
    print(*seled_columns)
    print('- - - - - ' * len(seled_columns))
    for p_dict in print_list:
        k = 1
        for sel_col in seled_columns:
            if len(seled_columns) == k:
                print(p_dict[sel_col], end='
')
            else:
                print(p_dict[sel_col], end='  ')
                k += 1


# 把文本文件转换成列表
def table_list():
    my_table_list = []
    with open('staff_table.txt', 'r', encoding='utf-8') as f:
        for line in f:
            line_strip = line.split(',')
            # print(line_strip)
            staff_dict = {
                'staff_id': line_strip[0],
                'name': line_strip[1],
                'age': line_strip[2],
                'phone': line_strip[3],
                'dept': line_strip[4],
                'enroll_date': line_strip[5].strip('
')
            }
            my_table_list.append(staff_dict)
        # print(my_table_list)
        return my_table_list


# 查询模块
def select(user_sql, my_table_list):
    try:
        i = 0
        if len(user_sql) == 4 and user_sql[3] == 'staff_table':
            print_list = []
            # print(my_table_list)
            if user_sql[1] == '*':
                seled_columns = ('staff_id', 'name', 'age', 'phone', 'dept', 'enroll_date')
            else:
                seled_columns = user_sql[1].split(',')
            for li in my_table_list:
                p_dict = {}
                for col in seled_columns:
                    p_dict[col] = li[col]
                print_list.append(p_dict)
                i += 1
            format_print(seled_columns, print_list)
            print('%s rows in set (0.00 sec)' % i)
        elif user_sql[3] == 'staff_table':
            if user_sql[1] == '*':
                seled_columns = ('staff_id', 'name', 'age', 'phone', 'dept', 'enroll_date')
                # print(seled_columns)
            else:
                seled_columns = user_sql[1].split(',')
            seled_compare_column = user_sql[5]
            seled_compare_symbol = user_sql[6]
            seled_compare_value = user_sql[7]
            seled_compare_value = seled_compare_value.lstrip('"')
            seled_compare_value = seled_compare_value.rstrip('"')
            print_list = []
            for line in my_table_list:
                print_dict = {}
                if seled_compare_symbol == '>':
                    if line[seled_compare_column] > seled_compare_value:
                        for column in seled_columns:
                            print_dict[column] = line[column]
                        print_list.append(print_dict)
                        i += 1
                elif seled_compare_symbol == '<':
                    if line[seled_compare_column] < seled_compare_value:
                        for column in seled_columns:
                            print_dict[column] = line[column]
                        print_list.append(print_dict)
                        i += 1
                elif seled_compare_symbol == '=':
                    if line[seled_compare_column] == seled_compare_value:
                        for column in seled_columns:
                            print_dict[column] = line[column]
                        print_list.append(print_dict)
                        i += 1
                elif seled_compare_symbol == '>=':
                    if line[seled_compare_column] >= seled_compare_value:
                        for column in seled_columns:
                            print_dict[column] = line[column]
                        print_list.append(print_dict)
                        i += 1
                elif seled_compare_symbol == '<=':
                    if line[seled_compare_column] <= seled_compare_value:
                        for column in seled_columns:
                            print_dict[column] = line[column]
                        print_list.append(print_dict)
                        i += 1
                elif seled_compare_symbol == 'like':
                    if seled_compare_value in line[seled_compare_column]:
                        for column in seled_columns:
                            print_dict[column] = line[column]
                        print_list.append(print_dict)
                        i += 1
                else:
                    print('incorrect syntax!')
            # print(print_list)
            format_print(seled_columns, print_list)
            print('%s rows in set (0.00 sec)' % i)
        else:
            print("ERROR 1146 (42S02): Table '%s' doesn't exist" % user_sql[3])
    except:
        print('ERROR 1064 (42000): You have an error in your SQL syntax;')


# 添加模块
def insert(user_sql, my_table_list):
    try:
        # print(user_sql)
        if user_sql[2] == 'staff_table':
            flag = True
            insert_value_formate = []
            insert_value = user_sql[4]
            insert_value = insert_value.lstrip('(')
            insert_value = insert_value.rstrip(')')
            insert_value = insert_value.split(',')
            # print('insert_value:     ', insert_value)
            for v in insert_value:
                insert_value_formate.append(v.strip("'"))
            # print('insert_value_formate:  ',insert_value_formate)
            # print(type(insert_value))
            staff_id_list = []
            insert_staff = {}
            # print(my_table_list)
            for my_col in my_table_list:
                if my_col['phone'] == insert_value_formate[2]:
                    duplicate_key = my_col['phone']
                    flag = False
                else:
                    staff_id_list.append(int(my_col['staff_id']))
            if flag:
                staff_id_new = int(max(staff_id_list)) + 1
                # print('dasdasd',staff_id_new)
                insert_staff['staff_id'] = str(staff_id_new)
                insert_staff['name'] = insert_value_formate[0]
                insert_staff['age'] = insert_value_formate[1]
                insert_staff['phone'] = insert_value_formate[2]
                insert_staff['dept'] = insert_value_formate[3]
                insert_staff['enroll_date'] = insert_value_formate[4]
                # print('insert_staff:   ', insert_staff)
                my_table_list.append(insert_staff)
                # print('my_table_list:   ', my_table_list)
                cope_file(my_table_list)
                print('Query OK, 1 row affected (0.01 sec)')
            else:
                print("ERROR 1062 (23000): Duplicate entry '%s' for key 'PRIMARY'" % duplicate_key)
        else:
            print("ERROR 1146 (42S02): Table '%s' doesn't exist" % user_sql[2])
    except:
        print('ERROR 1064 (42000): You have an error in your SQL syntax;')


# 更新模块
def update(user_sql, my_table_list):
    try:
        # print(user_sql)
        if user_sql[1] == 'staff_table':
            update_list = []
            count = 0
            seled_compare_value = user_sql[9]
            seled_compare_value = seled_compare_value.lstrip('"')
            seled_compare_value = seled_compare_value.rstrip('"')
            seled_change_value = user_sql[5]
            seled_change_value = seled_change_value.lstrip('"')
            seled_change_value = seled_change_value.rstrip('"')
            for my_table_dict in my_table_list:
                if seled_compare_value == my_table_dict[user_sql[7]]:
                    my_table_dict[user_sql[7]] = seled_change_value
                    count += 1
                update_list.append(my_table_dict)
            cope_file(update_list)
            print('Query OK, %s row affected (0.02 sec)
Rows matched: %s  Changed: %s  Warnings: 0' % (
            count, count, count))
        else:
            print("ERROR 1146 (42S02): Table '%s' doesn't exist" % user_sql[1])
    except:
        print('ERROR 1064 (42000): You have an error in your SQL syntax;')


# 删除模块
def delete(user_sql, my_table_list):
    try:
        if user_sql[2] == 'staff_table':
            # print('aaaa',my_table_list)
            flag = False
            del_list = []
            for my_col in my_table_list:
                if my_col[user_sql[4]] == user_sql[6]:
                    # print('this is user_sql[4]',user_sql[4],type(user_sql[4]))
                    flag = True
                    continue
                else:
                    del_list.append(my_col)
            if flag:
                cope_file(del_list)
                print('Query OK, 1 row affected (0.02 sec)')
            else:
                cope_file(del_list)
                print('error! no data effect!')
        else:
            print("ERROR 1146 (42S02): Table '%s' doesn't exist" % user_sql[2])
    except:
        print('ERROR 1064 (42000): You have an error in your SQL syntax;')


# 把列表转换成文本文件
def cope_file(staff_list):
    with open('staff_table_new.txt', 'w', encoding='utf8') as f:
        for staff_colunm_dict in staff_list:
            f.write(staff_colunm_dict['staff_id'] + ',' 
                    + staff_colunm_dict['name'] + ',' 
                    + staff_colunm_dict['age'] + ',' 
                    + staff_colunm_dict['phone'] + ',' 
                    + staff_colunm_dict['dept'] + ',' 
                    + staff_colunm_dict['enroll_date'] + '
')
    os.remove('staff_table.txt')
    os.rename('staff_table_new.txt', 'staff_table.txt')


# 主调度模块
def main():
    cmd_dic = {
        'select': select,
        'insert': insert,
        'update': update,
        'delete': delete,
    }
    print('''
Welcome to the WKSQL monitor.
    ''')
    while True:
        staff_list = table_list()
        # sql = select * from userlist;
        sql = input('(root@localhost [staff_db]) ').strip()
        if not sql:
            continue
        cmd_info = sql.split()
        cmd = cmd_info[0].lower()
        cmd_last = cmd_info[-1]
        str_list = list(cmd_last)
        if sql == 'exit':
            print('Bye')
            break
        if str_list[-1] == ';':  # 去除;号
            str_list.pop()
            cmd_info[-1] = ''.join(str_list)
        if cmd in cmd_dic:
            cmd_dic[cmd](cmd_info, staff_list)
        else:
            print('cmd not exists')
            # cope_file(staff_list)


if __name__ == '__main__':  # 主程序入口
    main()
原文地址:https://www.cnblogs.com/godspeed034/p/7245638.html