RollingRegression(滚动回归分析)之Python实现

# -*- coding: utf-8 -*-
"""
Created on Sat Aug 18 11:08:38 2018

@author: acadsoc
"""

import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from pyecharts import Bar, Line, Page, Overlap
import statsmodels.api as sm
from sklearn.preprocessing import StandardScaler
# import pymssql
from dateutil import parser
import copy
import os
import sys
from featureSelection import featureSelection

plt.style.use('ggplot') # 设置ggplot2画图风格
# 根据不同平台设置其中文字体路径
if sys.platform == 'linux':
    zh_font = matplotlib.font_manager.FontProperties(
        fname='path/anaconda3/lib/python3.6/site-packages/matplotlib/mpl-data/fonts/ttf/STZHONGS.TTF')
else:
    zh_font = matplotlib.font_manager.FontProperties(fname='C:WindowsFontsSTZHONGS.ttf')  # 设置中文字体

# 根据不同平台设定工作目录
if sys.platform == 'linux':
    os.chdir(path) # Linux path
else:
    os.chdir(path) # Windows path

class rollingRegression():
    '''
    滚动多元回归分析类。
    
    参数
    ----
    response : str
        回归因变量。
    date_begin : datetime
        起始日期。
    date_end : datetime
        截止日期。
    rolling_days : int
        滚动天数。
    intercept : bool
        回归方程是否带常数项。
    p_value_threshold : float
        回归系数按p值显示阈值。
    normalize : bool
        数据是否标准化。
        
    属性
    ----
    coef_ : dataframe
        回归系数。
    coef_pvalue_ : dataframe
        回归系数pvalue。
    r2_ : dataframe
        回归模型Rsquared 和 Rsquared_adj。
    echarts_page_ : echarts_page
        echarts_page 文件。       
    '''
    def __init__(self, response='新单数', date_begin='2018-01-01', date_end='2018-07-31', rolling_days=30,
                 intercept=False, p_value_threshold=.1, normalize=False):
        self.response = response  # 回归因变量
        self.date_begin = date_begin  # 起始日期
        self.date_end = date_end  # 终止日期
        self.rolling_days = rolling_days  # 滚动天数
        self.intercept = intercept  # 回归方程是否带常数项
        self.p_value_threshold = p_value_threshold  # p值显示阈值
        self.normalize = normalize  # 是否将数据标准化后再进行回归分析
        if self.normalize:  # 如果数据标准化,常数强制设置为0
            self.intercept = False
        # 起始日期间隔必须大于等于滚动天数
        if (parser.parse(self.date_end) - parser.parse(self.date_begin)).days < self.rolling_days:
            raise IOError('起始日期间隔必须大于等于滚动天数,请重新选择起始日期或者调整滚动日期。')    
            
    def getData(self, file='业绩相关数据2018-8-1.xlsx', variabls_in=None, variables_out=None):
        '''
        读取数据。
        注:该方法只适合读取特定方式的数据,第一列应是datetime类型,第二列是因变量,其余是自变量。
        
        参数
        ----
        file : str
            文件路径及文件名。
        variabls_in : list, 默认是None
            需要用到的自变量。
        variables_out : list, 默认是None
            不用的自变量。
            
        return
        ------
        df_ : dataframe
            分析用数据框,response在第一列。
        '''
        if variabls_in:   # 如果有选入模型的自变量,强制将variable_out设置为None
            variables_out = None
        
        if file.split('.')[-1] == 'xlsx':  # 根据不同文件格式读取数据        
            df = pd.read_excel(file)  
        elif file.split('.')[-1] == 'txt':
            df = pd.read_table(file)
        else:
            df = eval('pd.read_' + file.split('.')[-1] + '(file)')

        df.index = df.iloc[:, 0]  # 将日期变为索引
        df = df.iloc[:, 1:]             
        df[df.isnull()] = 0  # 缺失值填充        
        df = df.astype(float)  # 将数据框object格式转换为float
        # dateTransfer = np.vectorize(self._dateTransfer)   # 向量化日期转换函数                
        # df.index = dateTransfer(df.index) # 转换索引日期格式          
        df.index = pd.DatetimeIndex(df.index)   # 将索引转换为datetime格式
        
        if variabls_in:
            df = pd.concat([df[df.columns[0]], df[variabls_in]], axis=1)
        
        if variables_out:
            for var in variables_out:
                df.pop(var)            
       
        if self.normalize:   # 数据标准化     
            df_std = StandardScaler().fit_transform(df)
            self.df_ = pd.DataFrame(df_std, index=df.index, columns=df.columns)    
        else:
            self.df_ = df
        
        return self
    
    def rollingOLS(self, df):
        '''
        滚动日期多元线性模型。
        
        参数
        ----
        df : dataframe
            回归分析用数据框,response在第一列。
            
        return
        ------
        coef : dataframe
            回归系数。
        coef_pvalue : dataframe
            回归系数pvalue。
        r2 : dataframe
            回归模型Rsquared 和 Rsquared_adj。
        '''
        df = df.loc[(df.index>=self.date_begin) & (df.index<=self.date_end), :]   # 按照参数给定起始、截止时间选择数据
        df = df.sort_index(ascending=True)  # 按日期升序排序
        coef = {}
        coef_pvalue = {}
        r2 = {}

        # 从起始日开始做回归
        for i in range(df.shape[0] - self.rolling_days):
            date = df.index[i+self.rolling_days]   
            data = df.iloc[i:i+self.rolling_days, :]
            X = data.iloc[:, 1:]
            y = data.iloc[:, 0]        
            # 线性回归模型拟合    
            model = sm.OLS(y, X, hasconst=self.intercept)
            lr = model.fit()

            # 按字典格式保存系数、pvalue、R2
            coef[date] = lr.params            
            coef_pvalue[date] = lr.pvalues
            r2[date] = []
            r2[date].append(lr.rsquared)
            r2[date].append(lr.rsquared_adj)

        # 系数字典转化为数据框,并按日期升序排序
        coef = pd.DataFrame.from_dict(coef, orient='index')
        coef = coef.sort_index(ascending=True)

        # 系数pvalue转化为数据框,并按日期升序排序
        coef_pvalue = pd.DataFrame.from_dict(coef_pvalue, orient='index')
        coef_pvalue = coef_pvalue.sort_index(ascending=True)

        # R2转化为数据框,并按日期升序排序
        r2 = pd.DataFrame.from_dict(r2, orient='index')
        r2.columns = ['R_squred','R_squred_adj']
        r2 = r2.sort_index(ascending=True)
        return coef, coef_pvalue, r2
    
    def _dateTransfer(self, date):
        '''
        定义日期转换函数。
        
        参数
        ----
        date : str
            需要转换的日期数据。
        
        return
        ------
        date : datetime
            日期。
        '''
        return parser.parse(date).strftime('%Y-%m-%d')    
    
    def fit(self, feat_selected=None):
        '''
        多元回归分析并保存数据。
        
        参数
        ----
        feat_selected : list, 默认是None
            分析用的特征列表。
            
        return
        ------
        coef_ : dataframe
            回归系数。
        coef_pvalue_ : dataframe
            回归系数pvalue。
        r2_ : dataframe
            回归模型Rsquared 和 Rsquared_adj。
        '''
        if feat_selected is not None:
            df = pd.concat([self.df_.iloc[:, 0], self.df_[feat_selected]], axis=1)
        else:
            df = self.df_
        # 滚动回归分析        
        self.coef_, self.coef_pvalue_, self.r2_ = self.rollingOLS(df)  
        # 存储分析数据表
        self.coef_.to_excel('coef.xlsx')
        self.coef_pvalue_.to_excel('coef_pvalue.xlsx')
        self.r2_.to_excel('r2.xlsx')        
        return self    
    
    def coefPlots(self, width_subplot=12, height_subplot=5, columns_subplots=3):
        '''
        画滚动回归系数及pvalue图。
        
        参数
        ----
        width_subplot : int
            子图宽度。
        height_subplot : int
            子图高度。
        columns_subplots : int
            子图列数。
        '''
        num_subplots = self.coef_.shape[1] + 1  # 确定子图个数
        # 确定子图行数
        if num_subplots % columns_subplots == 0: # 余数为0
            rows_subplots = num_subplots // columns_subplots  # 取整
        else:
            rows_subplots = num_subplots // columns_subplots + 1
        # 确定画布宽、高
        width_figure = columns_subplots * width_subplot
        height_figure = rows_subplots * height_subplot
        
        # 绘制滚动回归R2图
        plt.figure(figsize=(width_figure, height_figure))
        plt.subplot(rows_subplots, columns_subplots, 1)
        plt.plot(self.r2_['R_squred'], color='r', lw=3, label='R_squred')
        plt.plot(self.r2_['R_squred_adj'], color='g', lw=3, label='R_squred_adj')
        plt.title('R2')
        plt.legend()
        # 在子图中画系滚动回归系数及p值图
        for i, feature in enumerate(self.coef_.columns):  # 系数图
            plt.subplot(rows_subplots, columns_subplots, i+2)
            plt.plot(self.coef_[feature], color='red', lw=3, label='Beta')

            for t, pvalue in zip(self.coef_pvalue_.index, self.coef_pvalue_[feature]):  # p值图
                if pvalue <= self.p_value_threshold:
                    plt.vlines(t, ymin=np.min(self.coef_[feature]), ymax=np.max(self.coef_[feature]),
                               color='green', alpha=.3, lw=5, label='p_value')

            #plt.xlabel('日期')
            if ((i + columns_subplots + 1) % columns_subplots) & (i > 0) == 0:
                plt.ylabel('coef')
            plt.title(feature, fontproperties=zh_font)
        # plt.savefig('rollingRegression.jpeg') # 保存图片
        plt.show()
        return self    
   
    def coefEcharts(self):
        '''
        利用Echarts画图。
        注:因为没有vline方法,故用echarts画出的图文件过大,在浏览器中打开很慢。
        
        参数
        ----
        
        return
        ------
        echarts_page_ : echarts_page
            echarts_page 文件。
        '''
        self.echarts_page_ = Page(self.response + '回归分析')
        charts = []
        zeros = np.zeros(self.coef_.shape[0])

        line = Line('R2')  # R2图
        bar = Bar()
        line.add('R_squred', self.r2_.index, self.r2_['R_squred'], is_more_utils=True)
        line.add('R_squred_adj', self.r2_.index, self.r2_['R_squred_adj'], is_more_utils=True)
        charts.append(line)

        for i, feature in enumerate(self.coef_.columns):  
            min_num = np.min(self.coef_[feature])
            max_num = np.max(self.coef_[feature])
            line = Line(feature)
            bar = Bar()
            ol = Overlap()
            line.add('coef', self.coef_.index, self.coef_[feature], is_more_utils=True) # 系数图
            #line.on()
            for t, pvalue in zip(self.coef_pvalue_.index, self.coef_pvalue_[feature]):  # p值图
                if pvalue <= self.p_value_threshold:
                    min_array, max_array = copy.deepcopy(zeros), copy.deepcopy(zeros)
                    min_array[self.coef_.index==t] = min_num
                    max_array[self.coef_.index==t] = max_num
                    bar.add('p-value', self.coef_.index, min_array)
                    bar.add('p-value', self.coef_.index, max_array)

            ol.add(line)
            ol.add(bar)
            charts.append(ol)

        self.echarts_page_.add(charts)
        self.echarts_page_.render()  # 保存格式为HTML, 保存地址为设定的全局path
        return self

原文地址:https://www.cnblogs.com/lantingg/p/9535019.html