基于Python的数据分析(3):文件和时间

在接下来的章节中,我会重点介绍一下我自己写的基于之前做python数据分析的打包接口文件common_lib,可以认为是专用于python的第三方支持库。common_lib目前包括文件操作、时间操作、excel接口操作、数据库接口、邮件接口。这些打包接口的作用就像是堆积木一样,把积木的主要模块都搭好了,仅需要大家按照自己的想法把它们拼接在一起堆出你的理想城堡!

捕获11

上面是common_lib的的文件目录结构,还处于不断的修改和完善的过程中,相信后续会有更多实用的接口文件会补充进来。命名方式参考之前写的《python的统一编码规范》,根据名字可以基本知道各文件对应的功能接口。

在本篇文章中我们将会接受文件和时间相关的操作接口,对应的是和io_datetime。

io_file

这个文件引用了一个标准模块,就是os。标准模块的意思就是不需要再下载安装的,python自带的接口文件。

python os模块包括了操作系统的基础功能,更为重要的是,如果希望编写的python程序跨平台运行的话,os模块尤其适合你。对这个模块感兴趣的同学可以敲以下两行代码进一步学习,不会让你失望的:

import os
print help(os)

好,我们进入主题。

在io_file我主要添加了在数据分析中常用到的对文件的处理,下面是函数介绍。

def has_file(filename): #判断是否存在给定的filename的文件

def is_file(filename): #判断给定filename是否是一个文件

def get_file_name(filename):#获得给定的filename的文件名

def get_file_type(filename): #获得给定的filename的类型

def get_curr_dir(): #获得当前的程序运行的路径

def get_files(path): #获得给定路径的所有文件名

def get_curr_files(dir=None):#获得当前路径中的所有文件名

def get_file_size(filename): #获得某个文件的大小,不常用

def delete_file(filename): #删除指定filename

def readlines_file(file_name, flag=None): #按照逐行的方式读取文件的所有内容

def writelines_file(file_name, data, flag=None):#按照逐行的方式写入所有内容。

def cmd_open(filename):#打开文件或者路径

上面的filename的定义是获取到的文件的绝对路径,例如“D: emp.txt”这样。

文件读写

下面重点介绍读写文件的两个函数。

关于python的文本文件读写(仅针对文本)分别有三种:

读文件:

read():以字节的形式读取整个文件,并输出字符串,包括换行符。

readline():读取一行内容,输出字符串

readlines():读取每一行内容,并输出以行数组。

写文件:write()、writerline()、writelines()三个函数和读文件的函数类似。

在我的文件读写接口中采用的是readlines()和writelines(),原因是它们以数组返回结果(这算什么原因??/(ㄒoㄒ)/~~)

以下是读文件的源码:

def readlines_file(file_name, flag=None):
    if not flag:
        flag="r"
    try:
        with open(file_name, flag) as f:
            data = f.readlines()
            return data
    except Exception, e:
        print "readlines_file:", str(e)
        # traceback.print_exc()
        return False

输入的参数有两个:file_name 选择读入的文件名,flag是读写标记默认为空。如果成功返回文件数据,否则返回False。

with open() as ..这种用法是Pythonic风格的代码。这种用法会除了让代码更简洁以外,还有其他好处。更多关于Pythonic的讲解可以参阅书籍《编写高质量代码-改善Python程序的91个建议》。

关于读写标记的说明,读我默认是只读”r”,写是以追加的标记写”a+”,关于读写的标记,我从网上摘录了一段,忘了出处了:

关于文件的读写模式的说明:
r 打开只读文件,该文件必须存在。
r+ 打开可读写的文件,该文件必须存在。
w 打开只写文件,若文件存在则文件长度清为0,即该文件内容会消失。若文件不存在则建立该文件。
w+ 打开可读写文件,若文件存在则文件长度清为零,即该文件内容会消失。若文件不存在则建立该文件。
a 以附加的方式打开只写文件。若文件不存在,则会建立该文件,如果文件存在,写入的数据会被加到文件尾,即文件原先的内容会被保留。
a+ 以附加方式打开可读写的文件。若文件不存在,则会建立该文件,如果文件存在,写入的数据会被加到文件尾后,即文件原先的内容会被保留。
上述的形态字符串都可以再加一个b字符,如rb、w+b或ab+等组合,加入b 字符用来告诉函数库打开的文件为二进制文件,而非纯文字文件。不过在POSIX系统,包含Linux都会忽略该字符。
+同时读写,即可读又可写,边写边读、边读边写,不用flush,用seek 和 tell可测得。

写的代码与读类似,就不再赘述:

def writelines_file(file_name, data, flag=None):
    if not flag:
        flag="a+"
    try:
        with open(file_name, flag) as f:
            f.writelines(data)
            return True
    except Exception, e:
        print "writelines_file:", str(e)
        # traceback.print_exc()
        return False

io_datetime

在日期时间(就这样翻译datetime是不是太丑了??)接口文件中,主要通过解决下面两个问题来满足数据分析的需要

1.解决日期类型之前转换的问题,主要是string类型、datetime类型、time类型三种类型之间变量类型的转换。

2.解决两个日期变量的间隔时长,包括工作日差等。

第一个问题目前来看主要是由于excel数据源的问题,excel单元格中存储时间数据的格式可能是文本格式的,也有可能是时间或者日期格式的。第二个问题主要是因为在分析工作中需要计算服务SLA的情况,因此需要计算日期差和工作日差。

在io_datetime中需要导入的标准库有time、datetime和re以及第三方库dateutil,都是用来处理日期时间格式。

def get_now(pattern=None): #获得当前日期时间,pattern参数是设置格式默认是'%Y-%m-%d %H:%M:%S',输出字符串

def str2date(date, pattern):#将字符串转换成datetime格式,参数是字符串的格式

def str2date_parse(date_str):#利用dateutil库来将字符串转换成datetime,好处是不需要输入格式

def time2str(time1, pattern):#把time格式的转换成字符串

def get_date_pattern(date1):#利用正则表达式把常见的日期时间格式提取出来

def date2str(date1, pattern=None):#将datetime格式转换成字符串

def str2time(date1, pattern=None):#将字符串格式转换成time格式

上述的函数是用来处理第一个问题的。对于获取日期时间格式的函数,我着重讲一下,发现目前还没人做这方面的工作:

def get_date_pattern(date1):
    date1 = str(date1).strip()
    if re.match(r"dddd-d{1,2}-d{1,2} d{1,2}:d{1,2}:d{1,2}", date1):
        return "%Y-%m-%d %H:%M:%S"
    elif re.match(r"dddd-d{1,2}-d{1,2}", date1):
        return "%Y-%m-%d"
    elif re.match(r"dddd/d{1,2}/d{1,2} d{1,2}:d{1,2}:d{1,2}", date1):
        return "%Y/%m/%d %H:%M:%S"
    elif re.match(r"dddd/d{1,2}/d{1,2}", date1):
        return "%Y/%m/%d"
    elif re.match(r"dd/dd/dd d{1,2}:d{1,2}:d{1,2}", date1):
        return "%m/%d/%y %H:%M:%S"
    elif re.match(r"dd/dd/dd", date1):
        return "%m/%d/%y"
    else:
        return None

通过get_date_pattern就可以把excel非中文的日期时间格式覆盖了。中文日期时间格式的如果有需要也可以补充进来。

下面是解决第2个问题的处理函数:

def subtract_date(date1, date2, days=None, seconds=None): #获取任意两个date之间的相差的天数或者秒数(由参数决定)

def subtract_work_day(date1, date2): #获取任意两个日期之间相差的工作日(9:00~17:00)天数,相差的小时数同样计入差额.

对于工作日做差的计算的计算方法,我之前想在网上搜索一下已有的算法,没有找到。我就根据现有的计算逻辑实现了接口算法。逻辑如下:

1.工作日的范围是9:00~17:00。节假日和工作日定义按照国家的做法定义(国务院放假办12月会发布下一年的放假安排)。除了已经被定义为工作日的天数以外,其余皆算为非工作日。因此两个日期的天数之差应该排除中间的非工作日天数。

2.小时数差的话,先将两个日期进行归一化和转义,得出两个日期的时间肯定在[9:00,17:00]这个区间。然后在区间内进行计算。

具体的实现代码如下,有点复杂:

def subtract_work_day(date1, date2):
    """
    do the subtract operation between two working dates, ignore the weekends,get day increment
    :param date1:input date1
    :param date2:input date2
    :return:the time diff (value as a day) between date1 and date2
    """

    try:
        d1 = str2date_parse(date1)
        d2 = str2date_parse(date2)
        maxDate = max([d1, d2])
        #把跨天的日期拉回来
        if maxDate.hour < 9:
            maxDate = str2date(date2str(maxDate + datetime.timedelta(days=-1), "%Y-%m-%d")
                               + " 17:00:00", '%Y-%m-%d %H:%M:%S')
        minDate = min([d1, d2])
        #将时间归一到[9:00~17:00]这个区间
        maxDate = transfer(maxDate)
        minDate = transfer(minDate)
        #间隔的天数
        total = (maxDate - minDate).days
        wk = 0
        if total >= 1:
            while (minDate < maxDate):
                if is_weekend(minDate):
                    wk += 1
                minDate = minDate + datetime.timedelta(days=1)
        result = total - wk
        if result < 0:
            result = 0
        minDate = transfer(min([d1, d2]))
        max_hour = maxDate.hour
        min_hour = minDate.hour
        max_minute = maxDate.minute
        min_minute = minDate.minute
        #确保不是同一天
        if date2str(maxDate, "%Y-%m-%d") != date2str(minDate, "%Y-%m-%d"):
            if is_weekend(maxDate) and is_weekend(minDate):
                temp=0
            elif is_weekend(minDate):
                temp=1.0*(max_hour-9)/8
            elif is_weekend(maxDate):
                temp=1.0*(17-min_hour)/8
            else:
                if min_hour > max_hour or (max_hour == min_hour and min_minute > max_minute):
                    temp = min_hour - max_hour
                    if max_hour == min_hour:
                        temp += (min_minute - max_minute) // 60
                    temp = 1 - 1.0 * temp / 8
                else:
                    temp = max_hour - min_hour
                    if max_hour == min_hour:
                        temp += (max_minute - min_minute) // 60
                    temp = 1.0 * temp / 8
        else:
            #如果是同一天,max_date的小时分钟数肯定大于min_date的小时分钟数
            if is_weekend(maxDate):
                temp=0
            else:
                temp = max_hour - min_hour
                if max_hour == min_hour:
                    temp += (max_minute - min_minute) // 60
                temp = 1.0 * temp / 8
        result += temp
        return result
    except Exception, e:
        print "ERROR IN subtract_date_working: " + str(e)
        traceback.print_exc()
        return None

def is_weekend(date1):
    #they are not weekends as defined, although they are not week days
    wrong_wk = ['2016/02/06', '2016/02/14', '2016/06/12', '2016/09/18', '2016/10/08',
                '2016/10/09']
    #they are weekends as defined, although they are week days
    right_wk = ['2015/10/01', '2015/10/02', '2015/10/05', '2015/10/06', '2015/10/07', 
                '2016/01/01', '2016/02/08', '2016/02/09', '2016/02/10', '2016/02/11', 
                '2016/02/12', '2016/04/04', '2016/05/02', '2016/06/09', '2016/06/10', 
                '2016/09/15', '2016/09/16', '2016/10/03', '2016/10/04', '2016/10/05', 
                '2016/10/06', '2016/10/07']
    if date2str(date1, "%Y/%m/%d") in wrong_wk:
        return False
    elif date2str(date1, "%Y/%m/%d") in right_wk:
        return True
    else:
        wd = date1.weekday()
        if wd == 5 or wd == 6:
            return True
        return False

def transfer(date1):
    if date1.hour >= 17:
        return str2date(date2str(date1, "%Y-%m-%d") + " 17:00:00", '%Y-%m-%d %H:%M:%S')
    elif date1.hour < 9:
        return str2date(date2str(date1, "%Y-%m-%d") + " 09:00:00", '%Y-%m-%d %H:%M:%S')
    else:
        return date1

需要源代码的同学请加我微博@cec_Fudan 私信留言吧。

本文原创,转载请征得作者同意,保留所有权利。

原文地址:https://www.cnblogs.com/kendrick/p/5193517.html