【股票基础选取】

import talib
import pandas as pd
import time
import numpy as np

class demo:
    
    
    def __init__(self):
        print('demo 类')
        self.h5_path = 'Z:/data/stock_data_py'
    def 加载数据(self,name,var):
        print('正在加载数据'+self.h5_path+'/'+var+'/'+name)
        df = pd.read_hdf(self.h5_path + '/' + var + '/' + name + '.h5')
        return df
    def demo_run(self,strategy_name):
        t = time.time() # 计时开始
        # 1.加载数据
        stocklist=self.加载数据('stocklist','list')
        stocklist['codenum'] = stocklist['code'].apply(lambda x: x[:6])
        closef_pd=self.加载数据('closef_pd','day')
        openf_pd=self.加载数据('openf_pd','day')
        highf_pd=self.加载数据('highf_pd','day')
        lowf_pd=self.加载数据('lowf_pd','day')
        amt_pd=self.加载数据('amt_pd','day')
        # 2.选股思路计算
        
        # MACD()
        def MACD():
            t = time.time() # 计时开始
            p = (12, 26, 9)
            def get_macd(x):
                DIF, DEA, MACD= talib.MACD(x.values, p[0], p[1], p[2])
                return MACD
            dfA = closef_pd.apply(get_macd, axis=0)
            print(time.time() - t) # 计时结束
            npA = dfA.values.T
            N = 1
            
            maxt = np.max(npA[:, -N - 5:-N-1], axis=1)
            
            ind=np.where( (np.max(npA[:, -N - 5:-N-1], axis=1) < 0) & (npA[:, -N] > 0) )[0]
            
            return ind
        
        # 可以扩展写一下其他的指标
        # KDJ 
        def KDJ():
            t = time.time() # 计时开始
            def get_kdj(x):
                ndim=x.name
                try:
                    k,d=talib.STOCH(highf_pd[ndim].values,lowf_pd[ndim].values,x.values,9,3,0,3,0)
                    j=3*d-2*k
                except:
                    j=np.array( [np.nan]*len(highf_pd) )
                return j
            dfA = closef_pd.apply(get_kdj, axis=0)
            print(time.time() - t) # 计时结束
            npA = dfA.values.T
            N = 1
            ind=np.where( (npA[:, -N-1] < 20) & (npA[:, -N] > 20) )[0]
            return ind
        # ind=KDJ()
        # RSI
        def RSI():
            t = time.time() # 计时开始
            def get_rsi(x):
                ndim=x.name
                rsi6=talib.RSI(x.values,6)
                rsi12=talib.RSI(x.values,12)
                rsi24=talib.RSI(x.values,24)
                rsi=rsi6-rsi12
                if  rsi6[-2]<50 and rsi[-2]<0 and  rsi[-1]>0:
                    return 1
                else:
                    return 0
            
            dfA = closef_pd.apply(get_rsi, axis=0)
            print(time.time() - t) # 计时结束
            npA = dfA.values.T
            N = 1
            ind=np.where( npA==1 )[0]
            return ind
        
        # BOLL
        def BOLL():
            t = time.time() # 计时开始
            def get_rsi(x):
                upper, middle, lower= talib.BBANDS(x.values, 20,2,2)
                return x.values-upper
            
            dfA = closef_pd.apply(get_rsi, axis=0)
            print(time.time() - t) # 计时结束
            npA = dfA.values.T
            N = 1
            
            ind=np.where( (np.max(npA[:, -N - 10:-N-1], axis=1) < 0) & (npA[:, -N] > 0) )[0]
            return ind
        
        # TRIX
        def TRIX():
            pass
            t = time.time() # 计时开始
            def get_trix(x):
                trix_data= talib.TRIX(x.values, 12)
                try:
                    trix_ma=talib.MA(trix_data,20)
                except:
                    trix_ma=np.array( [np.nan]*len(x) )
                return trix_data-trix_ma
            
            dfA= closef_pd.apply(get_trix, axis=0)
            print(time.time() - t) # 计时结束
            npA = dfA.values.T
            N = 1
            ind=np.where( (np.max(npA[:, -N - 10:-N-1], axis=1) < 0) & (npA[:, -N] > 0) )[0]
            return ind
        def 共振():
            # 求交集
            pass
            indA=TRIX()
            indB=MACD()
            indC=RSI()
            indD=BOLL()
            #数组的交集
            ind=np.intersect1d(indC,indB)
            return ind
        ind=eval(strategy_name+'()')
        temp = ind
        npB = stocklist.iloc[ind]
        print(time.time() - t) # 计时结束
        return npB
    
    

if __name__=='__main__':
    self=demo()
    listMACD=self.demo_run(strategy_name='MACD')
    #listKDJ=self.demo_run(strategy_name='KDJ')
    #listRSI=self.demo_run(strategy_name='RSI')
    # listBOLL=self.demo_run(strategy_name='BOLL')
    #listTRIX=self.demo_run(strategy_name='TRIX')
    #list共振=self.demo_run(strategy_name='共振')
原文地址:https://www.cnblogs.com/fyandy/p/9650896.html