财经数据(4)-Tushare金融接口数据获取

Tushare接口数据请求:分当天、历史两种情况。通过flag标签控制

由于部分接口请求限制,采用offset限制每次请求数量,time.sleep控制请求间隔时间

部分数据才用drop_duplicates(keep=False)去除所有重复数据

数据利用pandas写入mysql数据库。

数据表字段设置,详见:

Tushare接口,详见:https://tushare.pro/document/2

# -*- coding: utf-8 -*-
import pandas as pd
import tushare as ts
import time
import datetime
from sqlalchemy import create_engine
import requests
import json
import datetime


# ====================股票列表:stock_basic======================================================================================================================
def stockList(engine):
    print("-------------------------------------------")
    print("开始从Tushare接口获取股票列表数据")
    l_list = pro.stock_basic(list_status='L', fields='ts_code,symbol,name,area,list_status,list_date')
    p_list = pro.stock_basic(list_status='P', fields='ts_code,symbol,name,area,list_status,list_date')
    exist_list = pd.read_sql('select * from 股票列表', engine)

    total_list = pd.concat([l_list, p_list, exist_list], axis=0, sort=False)
    stock_list = total_list.drop_duplicates(subset=None, keep=False)

    stock_list.to_sql('all_stock', engine, if_exists='append', index=False)

    print(stock_list)
    print("本次存储股票列表数据%s条" % stock_list.shape[0])


# ====================指数日线行情:index_daily===================================================================================================================
def indexTrade(flag, engine, *args):
    # 单次最多请求8000条
    print("-------------------------------------------")
    print("开始从Tushare接口获取指数行情数据")
    if flag == "Y":
        # 获取交易日数据
        cur_df1 = pro.index_daily(ts_code='000001.SH', trade_date=current, fields='ts_code,trade_date,close,open,high,low,pre_close,pct_chg,amount')  # 上证指数
        cur_df2 = pro.index_daily(ts_code='399006.SZ', trade_date=current, fields='ts_code,trade_date,close,open,high,low,pre_close,pct_chg,amount')  # 创业板指数
        cur_df3 = pro.index_daily(ts_code='399106.SZ', trade_date=current, fields='ts_code,trade_date,close,open,high,low,pre_close,pct_chg,amount')  # 深证综指
        idx_daily = pd.concat([cur_df1, cur_df2, cur_df3], axis=0)
    elif flag == "N":
        # 获取历史交易数据
        his_df1 = pro.index_daily(ts_code='000001.SH', start_date=start, end_date=end, fields='ts_code,trade_date,close,open,high,low,pre_close,pct_chg,amount')  # 上证指数
        his_df2 = pro.index_daily(ts_code='399006.SZ', start_date=start, end_date=end, fields='ts_code,trade_date,close,open,high,low,pre_close,pct_chg,amount')  # 创业板指数
        his_df3 = pro.index_daily(ts_code='399106.SZ', start_date=start, end_date=end, fields='ts_code,trade_date,close,open,high,low,pre_close,pct_chg,amount')  # 深证综指
        idx_daily = pd.concat([his_df1, his_df2, his_df3], axis=0)

    idx_daily['amount'] = round(idx_daily['amount'].map(lambda x: x / 10), 2)
    idx_daily.to_sql('market_index', engine, if_exists='append', index=False)

    print(idx_daily)
    print("本次存储指数行情数据%s条" % idx_daily.shape[0])


# ====================日线行情:daily============================================================================================================================
def dailyData(flag, engine, *args):
    # 本接口是未复权行情,停牌期间不提供数据,每分钟内最多调取200次,每次4000条数据,交易日当天15点~16点之间更新
    print("-------------------------------------------")
    print("开始从Tushare接口获取个股行情数据")
    # 创建空DataFrame
    daily = pd.DataFrame()
    if flag == "Y":
        # 获取交易日数据
        for offset in range(0, 9000, 3000):
            cur_daily = pro.daily(trade_date=current, fields='ts_code,trade_date,open,high,low,close,pre_close,pct_chg,amount', offset=offset, limit=3000)
            daily = daily.append(cur_daily, ignore_index=True)
    elif flag == "N":
        # 获取历史交易数据
        for offset in range(0, 360000, 3000):
            his_daily = pro.daily(start_date=start, end_date=end, fields='ts_code,trade_date,open,high,low,close,pre_close,pct_chg,amount', offset=offset, limit=3000)
            daily = daily.append(his_daily, ignore_index=True)

    daily['amount'] = daily['amount'].map(lambda x: round(x / 10, 2))
    daily.to_sql('stock_tradeing_data', engine, if_exists='append', index=False)

    print(daily)
    print("本次存储个股行情数据%s条" % daily.shape[0])


# ====================每日指标:daily_basic======================================================================================================================
def dailyBasic(flag, engine, *args):
    # 交易日每日15点~17点之间更新
    print("-------------------------------------------")
    print("开始从Tushare接口获取个股基础行情指标数据")
    # 创建空的dataframe
    daily_basic = pd.DataFrame()
    if flag == "Y":
        # 获取交易日数据
        for offset in range(0, 9000, 3000):
            cur_basic = pro.daily_basic(trade_date=current, fields='ts_code,trade_date,turnover_rate_f,volume_ratio,pe_ttm,pb,ps_ttm,dv_ttm,total_share,float_share,total_mv,circ_mv', offset=offset, limit=3000)
            daily_basic = daily_basic.append(cur_basic, ignore_index=True)
    elif flag == "N":
        # 获取历史交易数据
        for offset in range(0, 360000, 3000):
            his_basic = pro.daily_basic(start_date=start, end_date=end, fields='ts_code,trade_date,turnover_rate_f,volume_ratio,pe_ttm,pb,ps_ttm,dv_ttm,total_share,float_share,total_mv,circ_mv', offset=offset, limit=3000)
            daily_basic = daily_basic.append(his_basic, ignore_index=True)

    daily_basic.to_sql('basic_stock_index', engine, if_exists='append', index=False)

    print(daily_basic)
    print("本次存储个股基础行情指标数据%s条" % daily_basic.shape[0])


# ====================沪深港通资金流向:moneyflow_hsgt==========================================================================================================
def hsgtMoneyflow(flag, engine, *args):
    # 每次最多返回300条记录,总量不限制
    print("-------------------------------------------")
    print("开始从Tushare接口获取沪深港通资金数据")
    # 创建空DataFrame
    hsgt = pd.DataFrame()
    if flag == "Y":
        # 获取交易日数据
        cur_hsgt = pro.moneyflow_hsgt(trade_date=current, fields='trade_date,hgt,sgt,north_money')
        hsgt = hsgt.append(cur_hsgt, ignore_index=True)
    elif flag == "N":
        # 获取历史数据
        for offset in range(0, 900, 300):
            his_hsgt = pro.moneyflow_hsgt(start_date=start, end_date=end, fields='trade_date,hgt,sgt,north_money', offset=offset, limit=300)
            hsgt = hsgt.append(his_hsgt, ignore_index=True)

    hsgt['hgt'] = hsgt['hgt'].map(lambda x: round(x * 100, 1))
    hsgt['sgt'] = hsgt['sgt'].map(lambda x: round(x * 100, 1))
    hsgt['north_money'] = hsgt['north_money'].map(lambda x: round(x * 100, 1))

    hsgt.to_sql('north_fund', engine, if_exists='append', index=False)

    print(hsgt)
    print("本次存储沪深港股通资金数据%s条" % hsgt.shape[0])


# ====================个股资金流向:moneyflow===================================================================================================================
def stcokMoneyflow(flag, engine, *args):
    # 单次最大提取4000行记录,总量不限制
    print("-------------------------------------------")
    print("开始从Tushare接口获取个股大单资金数据")
    # 创建空的dataframe
    dd_money = pd.DataFrame()
    if flag == "Y":
        # 获取交易日数据
        for offset in range(0, 9000, 3000):
            cur_money = pro.moneyflow(trade_date=current, offset=offset, limit=3000)
            dd_money = dd_money.append(cur_money, ignore_index=True)
    elif flag == "N":
        # 获取历史交易数据
        for offset in range(0, 360000, 3000):
            his_money = pro.moneyflow(start_date=start, end_date=end, offset=offset, limit=3000)
            dd_money = dd_money.append(his_money, ignore_index=True)

    dd_money.to_sql('big_stock_fund', engine, if_exists='append', index=False)

    print(dd_money)
    print("本次存储个股大单资金数据%s条" % dd_money.shape[0])


# ====================大宗交易:block_trade=====================================================================================================================
def blockTrade(flag, engine, *args):
    # 单次最大1000条,总量不限制
    print("-------------------------------------------")
    print("开始从Tushare接口获取大宗交易数据")
    block_trade = pd.DataFrame()
    if flag == "Y":
        # 获取交易日数据
        cur_block = pro.block_trade(trade_date=current, fields='ts_code,trade_date,price,amount,buyer,seller')
        block_trade = block_trade.append(cur_block, ignore_index=True)
    elif flag == "N":
        # 获取历史交易数据
        for offset in range(0, 9000, 1000):
            his_block = pro.block_trade(start_date=start, end_date=end, fields='ts_code,trade_date,price,amount,buyer,seller', offset=offset, limit=1000)
            block_trade = block_trade.append(his_block, ignore_index=True)

    block_trade.to_sql('bulk_trading', engine, if_exists='append', index=False)

    print(block_trade)
    print("本次存储大宗交易数据%s条" % block_trade.shape[0])


# ====================每日涨跌停统计:limit_list=================================================================================================================
def zdCount(flag, engine, *args):
    # 单次最大1000,总量不限制
    print("-------------------------------------------")
    print("开始从Tushare接口获取涨跌停统计数据")
    limit = pd.DataFrame()
    if flag == "Y":
        # 获取交易日数据
        for offset in range(0, 3000, 1000):
            cur_limit = pro.limit_list(trade_date=current, fields='trade_date,ts_code,fc_ratio,fd_amount,first_time,last_time,open_times,strth,limit', offset=offset, limit=1000)
            limit = limit.append(cur_limit, ignore_index=True)
    elif flag == "N":
        # 获取历史交易数据
        for offset in range(0, 90000, 1000):
            his_limit = pro.limit_list(start_date=start, end_date=end, fields='trade_date,ts_code,trade_date,fc_ratio,fd_amount,first_time,last_time,open_times,strth,limit', offset=offset, limit=1000)
            limit = limit.append(his_limit, ignore_index=True)

    limit.to_sql('up_to_down_detail', engine, if_exists='append', index=False)

    print(limit)
    print("本次存储涨跌停统计数据%s条" % limit.shape[0])


# ====================Shibor利率数据:shibor====================================================================================================================
def shiborRate(flag, engine, *args):
    # 单次最大2000,总量不限制
    print("-------------------------------------------")
    print("开始从Tushare接口获取shibor利率数据")
    shibor = pd.DataFrame()
    if flag == "Y":
        # 获取交易日数据
        cur_shibor = pro.shibor(date=current, fields='date,on,1w,3m')
        shibor = shibor.append(cur_shibor, ignore_index=True)
    elif flag == "N":
        # 获取历史交易数据
        his_shibor = pro.shibor(start_date=start, end_date=end, fields='date,on,1w,3m')
        shibor = shibor.append(his_shibor, ignore_index=True)

    shibor.to_sql('shibor_rate', engine, if_exists='append', index=False)

    print(shibor)
    print("本次存储shibor利率数据%s条" % shibor.shape[0])


# ====================Tushare股东人数======================================================================================================================
def holderNumber(flag, engine, *args):
    # 单次最大3000,总量不限制,每分钟调取100次
    print("-------------------------------------------")
    print("开始从Tushare接口获取股东人数数据")

    count = 0
    holdernum = pd.DataFrame()
    if flag == "Y":
        # 获取交易日数据
        for offset in range(0, 0000, 3000):
            hdrm = pro.stk_holdernumber(start_date=current, end_date=current, fields='ts_code,end_date,holder_num', offset=offset, limit=3000)
            holdernum = holdernum.append(hdrm, ignore_index=True)
    elif flag == "N":
        # 获取历史交易数据
        for offset in range(0, 90000, 3000):
            hdrm = pro.stk_holdernumber(start_date=start, end_date=end, fields='ts_code,end_date,holder_num', offset=offset, limit=3000)
            holdernum = holdernum.append(hdrm, ignore_index=True)

    holdernum.to_sql('shareholder_number', engine, if_exists='append', index=False)

    print(holdernum)
    print("获取股东人数数据%s条" % holdernum.shape[0])


# ====================主函数====================================================================================================================================
if __name__ == '__main__':
    print("Tushare接口程序开始执行")
    print("-------------------------------------------")
    begin = time.time()

    # 初始化tushare.pro接口,创建mysql数据库引擎
    pro = ts.pro_api('ac16b470869c5d82db5033ae9288f77b282d2b5519507d6d2c72fdd7')
    engine = create_engine('mysql://root:123456@127.0.0.1/quant?charset=utf8')

    # 判断是否要获取交易日数据
    flag = input("是否获取交易日数据,选择Y或N:")
    if flag == "Y":
        print("开始从Tushare接口获取当前交易日数据")

        # 获取当天日期
        current = time.strftime("%Y%m%d", time.localtime())

        stockList(engine)                       # 获取股票列表
        indexTrade(flag, engine, current)       # 指数行情数据
        dailyData(flag, engine, current)        # 个股行情数据
        dailyBasic(flag, engine, current)       # 个股基础行情指标
        hsgtMoneyflow(flag, engine, current)    # 北向资金
        stcokMoneyflow(flag, engine, current)   # 个股大单资金
        blockTrade(flag, engine, current)       # 大宗交易
        zdCount(flag, engine, current)          # 涨跌停统计
        shiborRate(flag, engine, current)       # shibor利率
        holderNumber(flag, engine, current)     # 股东人数

    elif flag == "N":
        print("开始从Tushare接口获取历史数据")

        # 获取历史起始日期
        start = input("时间格式如:19491001,请输入起始日期:")
        end = input("时间格式如:19491001,请输入截止日期:")

        stockList(engine)                           # 获取股票列表
        indexTrade(flag, engine, start, end)        # 指数行情数据
        dailyData(flag, engine, start, end)         # 个股行情数据
        dailyBasic(flag, engine, start, end)        # 个股基础行情指标
        hsgtMoneyflow(flag, engine, start, end)     # 北向资金
        stcokMoneyflow(flag, engine, start, end)    # 个股大单资金
        blockTrade(flag, engine, start, end)        # 大宗交易
        zdCount(flag, engine, start, end)           # 涨跌停统计
        shiborRate(flag, engine, start, end)        # shibor利率
        holderNumber(flag, engine, start, end)      # 股东人数

    ed = time.time()
    print('本次程序共执行%0.2f秒.' % ((ed - begin)))
    print("Tushare接口程序执行完成")

  

  

  

  

原文地址:https://www.cnblogs.com/Iceredtea/p/12108701.html