数据分析 -- 常用代码 库

import csv
import pymysql

from openpyxl import load_workbook


# 读取现有Excel
class MyFuncRreadExcel(object):
    def __init__(self):
        self.mysqlConfig = {
            "host": "127.0.0.1",
            "user": "root",
            "password": "!@#$qwerasdfzxcv",
            "database": "dbforpymysql"
        }

    # 读取Excel文件,获取 工作表 最大行数 最大列数
    def Read_excel(self, xls_path):
        '''
        :param xls_path: excel path
        :return: {'sheet': sheet, 'row_max': row_max, 'column_max': column_max} 当前工作表对象| 最大行数| 最大列数
        '''
        # 打开一个Excel
        try:
            wb = load_workbook(xls_path)
            print('打开 %s 成功!' % xls_path)
        except Exception as e:
            print('打开 %s 失败!' % xls_path)
            raise e

        try:
            # 获取当前工作表
            sheet = wb.active
            print('打开工作表成功!')
        except Exception as e:
            print('打开sheet失败!')
            raise e

        # 获取表的最大行数
        row_max = sheet.max_row
        print('
商品表的最大行数为:', row_max)

        # 获取表格的最大列数
        column_max = sheet.max_column
        print('
商品表的最大列数为:', column_max)

        result = {
            'sheet': sheet,
            'row_max': row_max,
            'column_max': column_max,
        }

        return result

    # 数据处理函数 处理每行中所有列的数据
    def process_perRow(self, sheet, row_max, process_row_min=None, process_row_max=None):
        '''
        数据处理函数 处理每行中所有列的数据
        :param sheet: 需要处理的工作表 类型:对象
        :param row_max: excel 文件的最大行数
        :param process_row_max: 处理文件的最大行数,默认为None,代表总行数
        :param process_row_min: 处理文件的最小行数,默认为None,代表从第一行开始处理
        :return: 暂时返回True,没有含义
        '''
        # 函数处理结果,有备无患
        res = True

        # 参数格式化
        # 最大行数
        if not process_row_max:
            process_row_max = row_max

        # 最小行数
        if not process_row_min:
            process_row_min = 1

        # 处理行数计数
        row_count = 0

        # 获取每行信息
        for row in sheet.rows:
            row_count += 1

            # 判断行数是否在 设定范围内
            if process_row_min <= row_count <= process_row_max:
                print('正在处理第 %d 行数据:
' % row_count)

                # 打印行内容
                print(row)

                # 处理列数计数
                cell_count = 0

                # 获取此行 每列的信息
                for cell in row:
                    cell_count += 1
                    print('    正在处理第 %d 行 %d 列 的数据
' % (row_count, cell_count))

                    # 获取 列值
                    cell_value = cell.value

                    # 打印 列值
                    print('        第 %d 行 %d 列 的数据为:' % (row_count, cell_count), cell_value, '
')

                    # 此处填写 具体的处理逻辑

                    print('    结束处理第 %d 行 %d 列 的数据
' % (row_count, cell_count))

                print('结束处理第 %d 行数据
' % row_count)

        return res

    # 数据处理函数 处理每行中特定的某几列数据
    def process_Selected_column(self, sheet, row_max, column_index, process_row_min=None, process_row_max=None):
        '''

        :param sheet: 需要处理的工作表 类型:对象
        :param row_max: excel 文件的最大行数
        :param process_row_max: 处理文件的最大行数,默认为None,代表总行数
        :param process_row_min: 处理文件的最小行数,默认为None,代表从第一行开始处理
        :return: [
                    [columnX_value, columnX_value, columnX_value, ... columnX_value],   # 行
                    [columnX_value, columnX_value, columnX_value, ... columnX_value],
                ]
        '''
        # 函数处理结果:
        result = []

        # 参数格式化
        # 最大行数
        if not process_row_max:
            process_row_max = row_max

        # 最小行数
        if not process_row_min:
            process_row_min = 1

        # 处理行数计数
        row_count = 0

        # 获取每行信息
        for row in sheet.rows:
            row_count += 1

            # 判断行数是否在 设定范围内
            if process_row_min <= row_count <= process_row_max:
                print('正在处理第 %d 行数据:
' % row_count)

                # 打印行内容
                # print(row)

                # 保存每行特定列的结果
                lst = []

                # 根据要求 处理特定的几列 数据
                for item in column_index:
                    item -= 1

                    lst.append(row[item].value)

                # 每行的结果添加到result
                result.append(lst)

                print('结束处理第 %d 行数据
' % row_count)

        return result

    # csv保存文件
    def save2csv(self, filePath, data):
        '''

        :param filePath: 要保存的文件 路径+文件名
        :param data: 需要保存的数据 格式:[[xxx, xxx, xxx, xxx], [xxx, xxx, xxx, xxx]]
        :return: 无返回值
        '''
        # 多行写入(writerows([(),(),()]
        # newline = '':在Windows中默认会添加1个空行,所以加入newline = ''参数,避免空行出现
        with open(filePath, 'w', newline='', encoding='utf8') as f:
            writer = csv.writer(f)
            writer.writerows(data)

            print(filePath, '写入完毕!')

    # 数据库 操作 pymysql
    # 数据库基础:连接 关闭数据库
    def MySql_operation(self):
        pass
    
    # 主函数
    def main(self, filePath):
        # 调用入口函数, 获取字典 sheet row_max column_max
        result_sheet_dict = MyFuncRreadExcel.Read_excel(filePath)
        sheet = result_sheet_dict['sheet']
        row_max = result_sheet_dict['row_max']
        column_max = result_sheet_dict['column_max']

        # 2、设定处理行数范围
        # 最小行数 默认为1
        process_row_min = int(input('请输入需要处理的最小行数:'))
        # 最大行数 默认为None,代表处理到最后一行
        process_row_max = input('请输入需要处理的最大行数:')
        if not process_row_max:
            process_row_max = None
        else:
            process_row_max = int(process_row_max)

        # 3、设定 要处理的 特定列的索引值(从1开始算起)
        str_user_choice = input('请输入需要处理特定列索引,用句号(英文格式‘.’)分隔:')
        lst = str_user_choice.split('.')
        column_index = [int(item) for item in lst if item != '']

        # 二、用户选择:
        print('功能菜单如下:')
        print('''
            1、处理每行的所有列;
            2、获取特定行的特定列的值并打印;
            3、获取特定行的特定列的值并保存为csv文件;
            4、对特定行的特定列 进行数据处理;
        ''')
        user_choice = int(input('请选择:'))

        if user_choice == 1:
            # 调用处理方法:处理每行的所有列
            result_process_perRow = MyFuncRreadExcel.process_perRow(sheet, row_max, process_row_min, process_row_max)
        elif user_choice == 2:
            # 调用处理方法: 处理每行特定的几列
            # 得到 特定行范围内的特定列的值
            result_process_Selected_column = MyFuncRreadExcel.process_Selected_column(sheet, row_max, column_index,
                                                                                      process_row_min, process_row_max)
            print(result_process_Selected_column)
        elif user_choice == 3:
            # 调用处理方法: 处理每行特定的几列
            # 得到 特定行范围内的特定列的值
            result_process_Selected_column = MyFuncRreadExcel.process_Selected_column(sheet, row_max, column_index,
                                                                                      process_row_min, process_row_max)

            # 获取文件保存路径
            filePath2csv = input("请输入您要保存csv文件的路径+文件名:")

            # 调用csv保存方法
            MyFuncRreadExcel.save2csv(filePath2csv, result_process_Selected_column)

        elif user_choice == 4:
            print("正在快马加鞭滴开发ing!")


if __name__ == '__main__':
    # 一、常量配置
    # 1、excel 文件路径
    filePath = './链家二手房.xlsx'

    # 实例化
    MyFuncRreadExcel = MyFuncRreadExcel()
    # 调用 main 函数
    MyFuncRreadExcel.main(filePath)
原文地址:https://www.cnblogs.com/gengyufei/p/12688102.html