Python基础(15)-线程、进程、携程

# -*- coding: utf-8 -*-
import tushare as ts
from jqdatasdk import *
import pandas as pd
import requests
import json
import pymysql
from redis import Redis
from multiprocessing import Pool

import time
import datetime
from retry import retry
import warnings

warnings.filterwarnings('ignore')


# -------------------------------------------------------------------------------------------------------------
# mysql数据录入

def table_insert(dataframe, table, data):
    conn = pymysql.connect(host='localhost', user='root', passwd='123456', port=3306, db='quant', charset='utf8mb4')
    cursor = conn.cursor()

    keys = ','.join(dataframe.columns.tolist())
    values = ','.join(['%s'] * len(dataframe.columns.tolist()))
    sql = 'insert into {table}({keys}) VALUES({values})'.format(table=table, keys=keys, values=values)
    try:
        if cursor.executemany(sql, data):
            print('insert successful')
            conn.commit()
    except Exception as error:
        print('insert failed')
        print(error)
        conn.rollback()
    conn.close()


# -------------------------------------------------------------------------------------------------------------
# tushare:股票列表

# @retry(tries=-1, delay=5)
def stock_list():
    print('stock_list函数开始执行')
    # 1、接口数据请求
    stock = pro.stock_basic(list_status='L', fields='ts_code,name,area,industry,market,exchange,list_date')
    stock = stock.rename(columns={'ts_code': 'stk_code', 'name': 'stk_name', 'area': 'province'})
    stock['stk_code'] = stock['stk_code'].apply(lambda x: x[:6])
    stock['province'] = stock['province'].apply(lambda x: x.replace('深圳', '广东'))
    stock['province'] = stock['province'].apply(lambda x: x.replace('内蒙', '内蒙古'))
    stock['exchange'] = stock['exchange'].apply(lambda x: x.replace('SSE', ''))
    stock['exchange'] = stock['exchange'].apply(lambda x: x.replace('SZSE', ''))
    stock = stock[['stk_code', 'stk_name', 'province', 'industry', 'market', 'exchange', 'list_date']]

    print(stock.head(5))

    stock_list = stock['stk_code'].tolist()

    # 2、接口数据解析
    stock_data = []
    for index, row in stock.iterrows():
        stock_data.append((row['stk_code'], row['stk_name'], row['province'], row['industry'], row['market'], row['exchange'], row['list_date']))

    # 3、接口数据mysql存储
    table_insert(stock, 'stock_info', stock_data)

    return stock_list


# -------------------------------------------------------------------------------------------------------------
# tushare:概念股分类数据

# @retry(tries=-1, delay=5)
def concept_classify():
    print('concept_classify函数开始执行')
    cept_classify = pro.concept(fields='code,name')

    cept_id = []
    for index, row in cept_classify.iterrows():
        cept_id.append(row['code'])

    return cept_id


# -------------------------------------------------------------------------------------------------------------
# tushare:概念明细

# @retry(tries=-1, delay=5)
def concept_detail(cept_id):
    print('concept_detail函数开始执行')

    cept_detail = pd.DataFrame()
    for id in cept_id:
        cept = pro.concept_detail(id=id, fields='concept_name,ts_code,name')
        cept_detail = cept_detail.append(cept, ignore_index=True)
        time.sleep(1)

    cept_detail = cept_detail.rename(columns={'ts_code': 'stk_code', 'name': 'stk_name', 'concept_name': 'concept'})
    cept_detail['stk_code'] = cept_detail['stk_code'].apply(lambda x: x[:6])
    cept_detail = cept_detail[['stk_code', 'stk_name', 'concept']]

    print(cept_detail.head(5))

    cept_detail_data = []
    for index, row in cept_detail.iterrows():
        cept_detail_data.append((row['stk_code'], row['stk_name'], row['concept']))

    table_insert(cept_detail, 'concept_info', cept_detail_data)


# -------------------------------------------------------------------------------------------------------------
# tushare:上证指数

# @retry(tries=-1, delay=5)
def sz_index_info():
    print('sz_index_info函数开始执行')

    sz_index = pro.index_daily(ts_code='000001.SH', trade_date=current, fields='trade_date,close,open,high,low,pre_close,pct_chg,amount')
    sz_index = sz_index.rename(columns={'open': 'cur_open', 'close': 'cur_close'})
    sz_index['cur_open'] = sz_index['cur_open'].apply(lambda x: round(x, 2))
    sz_index['high'] = sz_index['high'].apply(lambda x: round(x, 2))
    sz_index['low'] = sz_index['low'].apply(lambda x: round(x, 2))
    sz_index['cur_close'] = sz_index['cur_close'].apply(lambda x: round(x, 2))
    sz_index['pre_close'] = sz_index['pre_close'].apply(lambda x: round(x, 2))
    sz_index['pct_chg'] = sz_index['pct_chg'].apply(lambda x: round(x, 2))
    sz_index['amount'] = sz_index['amount'].apply(lambda x: round(x / 10), 2)
    sz_index = sz_index[['trade_date', 'cur_open', 'high', 'low', 'cur_close', 'pre_close', 'pct_chg', 'amount']]

    print(sz_index.head(5))

    sz_index_data = []
    for index, row in sz_index.iterrows():
        sz_index_data.append((row['trade_date'], row['cur_open'], row['high'], row['low'], row['cur_close'], row['pre_close'], row['pct_chg'], row['amount']))

    table_insert(sz_index, 'sz_index', sz_index_data)


# -------------------------------------------------------------------------------------------------------------
# tushare:银行拆借利率

# @retry(tries=-1, delay=5)
def on_shibor_rate():
    print('on_shibor_rate函数开始执行')
    shibor = pro.shibor(date=current, fields='date,on')
    shibor = shibor.rename(columns={'date': 'trade_date', 'on': 'on_shibor'})
    shibor['on_shibor'] = shibor['on_shibor'].apply(lambda x: round(x, 2))
    shibor = shibor[['trade_date', 'on_shibor']]

    print(shibor.head(5))

    shibor_data = []
    for index, row in shibor.iterrows():
        shibor_data.append((row['trade_date'], row['on_shibor']))

    table_insert(shibor, 'shibor_rate', shibor_data)


# -------------------------------------------------------------------------------------------------------------
# tushare:沪深股通汇总

# @retry(tries=-1, delay=5)
def hsgt_total():
    print('hsgt_total函数开始执行')

    hsgt = pro.moneyflow_hsgt(trade_date=current, fields='trade_date,hgt,sgt,north_money')
    hsgt.fillna(0, inplace=True)
    hsgt['hgt'] = hsgt['hgt'].apply(lambda x: round(x * 100, 1))
    hsgt['sgt'] = hsgt['sgt'].apply(lambda x: round(x * 100, 1))
    hsgt['north_money'] = hsgt['north_money'].apply(lambda x: round(x * 100, 1))
    hsgt = hsgt[['trade_date', 'hgt', 'sgt', 'north_money']]

    print(hsgt.head(5))

    hsgt_data = []
    for index, row in hsgt.iterrows():
        hsgt_data.append((row['trade_date'], row['hgt'], row['sgt'], row['north_money']))

    table_insert(hsgt, 'hsgt_fund', hsgt_data)


# -------------------------------------------------------------------------------------------------------------
# tushare:沪深股通买入前十

# @retry(tries=-1, delay=5)
def hsgt_top_detail():
    print('hsgt_top_detail函数开始执行')
    hsgt_top_detail = pd.DataFrame()
    for offset in range(0, 6000, 3000):
        data_1 = pro.hsgt_top10(trade_date=current, fields='trade_date,ts_code,amount,buy,sell', offset=offset, limit=3000)
        hsgt_top_detail = hsgt_top_detail.append(data_1, ignore_index=True)

    hsgt_top_detail = hsgt_top_detail.rename(columns={'ts_code': 'stk_code'})
    hsgt_top_detail['stk_code'] = hsgt_top_detail['stk_code'].apply(lambda x: x[:6])
    hsgt_top_detail['amount'] = hsgt_top_detail['amount'].apply(lambda x: round(x / 10000, 2))
    hsgt_top_detail['buy'] = hsgt_top_detail['buy'].apply(lambda x: round(x / 10000, 2))
    hsgt_top_detail['sell'] = hsgt_top_detail['sell'].apply(lambda x: round(x / 10000, 2))
    hsgt_top_detail = hsgt_top_detail[['trade_date', 'stk_code', 'amount', 'buy', 'sell']]

    print(hsgt_top_detail.head(5))

    # 2、接口数据解析
    hsgt_top_detail_data = []
    for index, row in hsgt_top_detail.iterrows():
        hsgt_top_detail_data.append((row['trade_date'], row['stk_code'], row['amount'], row['buy'], row['sell']))

    table_insert(hsgt_top_detail, 'hsgt_top_detail', hsgt_top_detail_data)


# -------------------------------------------------------------------------------------------------------------
# tushare:日线行情、基础指标、涨跌停价格

# @retry(tries=-1, delay=5)
def stock_daily():
    print('stock_daily函数开始执行')
    frame_1 = pd.DataFrame()
    for offset in range(0, 6000, 3000):
        data_1 = pro.daily(trade_date=current, fields='ts_code,trade_date,open,high,low,close,pre_close,pct_chg,amount', offset=offset, limit=3000)
        frame_1 = frame_1.append(data_1, ignore_index=True)

    frame_2 = pd.DataFrame()
    for offset in range(0, 6000, 3000):
        data_2 = pro.daily_basic(trade_date=current, fields='ts_code,trade_date,turnover_rate_f,volume_ratio,pe_ttm,total_share,float_share', offset=offset, limit=3000)
        frame_2 = frame_2.append(data_2, ignore_index=True)

    frame_3 = pd.DataFrame()
    for offset in range(0, 6000, 3000):
        data_3 = pro.stk_limit(trade_date=current, fields='trade_date,ts_code,up_limit,down_limit', offset=offset, limit=3000)
        frame_3 = frame_3.append(data_3, ignore_index=True)

    # 连接list_1、list_2、list_3
    frame_4 = pd.merge(frame_1, frame_2, how='left', on=['ts_code', 'trade_date'])
    stock_daily = pd.merge(frame_4, frame_3, how='left', on=['ts_code', 'trade_date'])

    stock_daily = stock_daily.rename(columns={'ts_code': 'stk_code', 'open': 'cur_open', 'close': 'cur_close', 'turnover_rate_f': 'change_hands'})
    stock_daily.fillna(0, inplace=True)
    stock_daily['stk_code'] = stock_daily['stk_code'].apply(lambda x: x[:6])
    stock_daily['cur_open'] = stock_daily['cur_open'].apply(lambda x: round(x, 2))
    stock_daily['high'] = stock_daily['high'].apply(lambda x: round(x, 2))
    stock_daily['low'] = stock_daily['low'].apply(lambda x: round(x, 2))
    stock_daily['cur_close'] = stock_daily['cur_close'].apply(lambda x: round(x, 2))
    stock_daily['pre_close'] = stock_daily['pre_close'].apply(lambda x: round(x, 2))
    stock_daily['pct_chg'] = stock_daily['pct_chg'].apply(lambda x: round(x, 2))
    stock_daily['amount'] = stock_daily['amount'].apply(lambda x: round(x / 10), 2)
    stock_daily['change_hands'] = stock_daily['change_hands'].apply(lambda x: round(x, 2))
    stock_daily['volume_ratio'] = stock_daily['volume_ratio'].apply(lambda x: round(x, 2))
    stock_daily['pe_ttm'] = stock_daily['pe_ttm'].apply(lambda x: round(x, 2))
    stock_daily['total_share'] = stock_daily['total_share'].apply(lambda x: round(x, 2))
    stock_daily['float_share'] = stock_daily['float_share'].apply(lambda x: round(x, 2))
    stock_daily['up_limit'] = stock_daily['up_limit'].apply(lambda x: round(x, 2))
    stock_daily['down_limit'] = stock_daily['down_limit'].apply(lambda x: round(x, 2))
    stock_daily = stock_daily[['trade_date', 'stk_code', 'cur_open', 'high', 'low', 'cur_close', 'pre_close', 'pct_chg', 'amount', 'change_hands', 'volume_ratio', 'pe_ttm', 'total_share', 'float_share', 'up_limit', 'down_limit']]

    print(stock_daily.head(5))

    stock_daily_data = []
    for index, row in stock_daily.iterrows():
        stock_daily_data.append((row['trade_date'], row['stk_code'], row['cur_open'], row['high'], row['low'], row['cur_close'], row['pre_close'], row['pct_chg'],
                                 row['amount'], row['change_hands'], row['volume_ratio'], row['pe_ttm'], row['total_share'], row['float_share'], row['up_limit'], row['down_limit']))

    table_insert(stock_daily, 'stock_daily', stock_daily_data)


# -------------------------------------------------------------------------------------------------------------
# tushare:市场情绪数据计算

def scqx_detail(stock_daily):
    print('scqx_detail函数开始执行')
    company = 0  # 公司家数
    up = 0  # 上涨家数
    down = 0  # 下跌家数
    up_limit = 0  # 涨停家数
    down_limit = 0  # 跌停家数
    zr_up_limit = 0  # 自然涨停家数
    zr_down_limit = 0  # 自然跌停家数
    up_d5 = 0  # 涨幅大于5%家数
    down_d5 = 0  # 跌幅大于5%家数
    high_open = 0  # 大幅高开家数4%
    low_open = 0  # 大幅低开家数4%
    zbs = 0  # 炸板
    czt = 0  # 曾涨停

    # 1、市场情绪数据计算
    for index, row in stock_daily.iterrows():
        company += 1
        if row['pct_chg'] >= 0:
            up += 1
        if row['pct_chg'] < 0:
            down += 1
        if row['cur_close'] == row['up_limit']:
            up_limit += 1
        if row['cur_close'] == row['down_limit']:
            down_limit += 1
        if row['cur_close'] == row['up_limit'] and row['cur_open'] != row['up_limit']:
            zr_up_limit += 1
        if row['cur_close'] == row['down_limit'] and row['cur_open'] != row['down_limit']:
            zr_down_limit += 1
        if row['pct_chg'] >= 5:
            up_d5 += 1
        if row['pct_chg'] <= -5:
            down_d5 += 1
        if row['cur_open'] / row['pre_close'] > 1.04:
            high_open += 1
        if row['pre_close'] / row['cur_open'] > 1.04:
            low_open += 1
        if row['high'] == row['up_limit'] and row['cur_close'] != row['up_limit']:
            zbs += 1
        if row['high'] == row['up_limit']:
            czt += 1

    print(current, company, up, down, up_limit, down_limit, zr_up_limit, zr_down_limit, up_d5, down_d5, high_open, low_open, zbs, czt)

    # 2、数据存储
    conn = conn = pymysql.connect(host='localhost', user='root', passwd='123456', db='quant', charset='utf8mb4')
    cursor = conn.cursor()
    try:
        szqx_sql = '''insert ignore into market_sentiment(trade_date,company,up,down,up_limit,down_limit,zr_up_limit,zr_down_limit,up_d5,down_d5,high_open,low_open,zbs,czt)                               
                      values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' % (current, company, up, down, up_limit, down_limit, zr_up_limit, zr_down_limit, up_d5, down_d5, high_open, low_open, zbs, czt)
        cursor.execute(szqx_sql)
    except Exception as error:
        print('insert failed')
        print(error)
        conn.rollback()
    else:
        print('insert successful')
        conn.commit()
    cursor.close()
    conn.close()


# -------------------------------------------------------------------------------------------------------------
# tushare:大单资金

# @retry(tries=-1, delay=5)
def big_moneyflow():
    print('big_moneyflow函数开始执行')
    big_money = pd.DataFrame()
    for offset in range(0, 6000, 3000):
        data_1 = pro.moneyflow(trade_date=current, fields='ts_code,trade_date,buy_lg_amount,sell_lg_amount,buy_elg_amount,sell_elg_amount', offset=offset, limit=3000)
        big_money = big_money.append(data_1, ignore_index=True)

    big_money = big_money.rename(columns={'ts_code': 'stk_code'})
    big_money['stk_code'] = big_money['stk_code'].apply(lambda x: x[:6])
    big_money = big_money[['trade_date', 'stk_code', 'buy_lg_amount', 'sell_lg_amount', 'buy_elg_amount', 'sell_elg_amount']]

    print(big_money.head(5))

    big_money_data = []
    for index, row in big_money.iterrows():
        big_money_data.append((row['trade_date'], row['stk_code'], row['buy_lg_amount'], row['sell_lg_amount'], row['buy_elg_amount'], row['sell_elg_amount']))

    table_insert(big_money, 'big_moneyflow', big_money_data)


# -------------------------------------------------------------------------------------------------------------
# tushare:涨停统计,单词最多请求1000条

# @retry(tries=-1, delay=5)
def up_down_limit_detail():
    print('up_down_limit_detail函数开始执行')
    up_down_limit = pd.DataFrame()
    for offset in range(0, 2000, 1000):
        data_1 = pro.limit_list(trade_date=current, fields='trade_date,ts_code,fd_amount,first_time,last_time,open_times,strth,limit', offset=offset, limit=1000)
        up_down_limit = up_down_limit.append(data_1, ignore_index=True)

    up_down_limit = up_down_limit.rename(columns={'ts_code': 'stk_code', 'limit': 'ud_limit'})
    up_down_limit['stk_code'] = up_down_limit['stk_code'].apply(lambda x: x[:6])
    up_down_limit['fd_amount'] = up_down_limit['fd_amount'].apply(lambda x: round(x / 10000, 2))
    up_down_limit['strth'] = up_down_limit['strth'].apply(lambda x: round(x, 2))
    up_down_limit = up_down_limit[['trade_date', 'stk_code', 'fd_amount', 'first_time', 'last_time', 'open_times', 'strth', 'ud_limit']]

    print(up_down_limit.head(5))

    # 2、接口数据解析
    up_down_limit_data = []
    for index, row in up_down_limit.iterrows():
        up_down_limit_data.append((row['trade_date'], row['stk_code'], row['fd_amount'], row['first_time'], row['last_time'], row['open_times'], row['strth'], row['ud_limit']))

    table_insert(up_down_limit, 'up_down_limit_detail', up_down_limit_data)


# -------------------------------------------------------------------------------------------------------------
# tushare:融资融券汇总

# @retry(tries=-1, delay=5)
def margin_info():
    print('margin_info函数开始执行')
    yesterday = datetime.date.today() + datetime.timedelta(-1)
    yesterday = str(yesterday).replace('-', '')

    margin = pro.margin(trade_date=yesterday, fields='trade_date,exchange_id,rzye,rqye')
    margin['rzye'] = margin['rzye'].apply(lambda x: round(x / 10000, 2))
    margin['rqye'] = margin['rqye'].apply(lambda x: round(x / 10000, 2))
    margin.fillna(0, inplace=True)
    margin = margin[['trade_date', 'exchange_id', 'rzye', 'rqye']]

    print(margin.head(5))

    margin_data = []
    for index, row in margin.iterrows():
        margin_data.append((row['trade_date'], row['exchange_id'], row['rzye'], row['rqye']))

    table_insert(margin, 'margin', margin_data)


# -------------------------------------------------------------------------------------------------------------
# tushare:融资融券明细

# @retry(tries=-1, delay=5)
def margin_detail():
    print('margin_detail函数开始执行')
    yesterday = datetime.date.today() + datetime.timedelta(-1)
    yesterday = str(yesterday).replace('-', '')

    margin_detail = pd.DataFrame()
    for offset in range(0, 6000, 3000):
        data_1 = df = pro.margin_detail(trade_date=yesterday, fields='trade_date,ts_code,name,rzye,rqye', offset=offset, limit=3000)
        margin_detail = margin_detail.append(data_1, ignore_index=True)

    print(margin_detail.head(5))

    margin_detail = margin_detail.rename(columns={'ts_code': 'stk_code', 'name': 'stk_name'})
    margin_detail['stk_code'] = margin_detail['stk_code'].apply(lambda x: x[:6])
    margin_detail['rzye'] = margin_detail['rzye'].apply(lambda x: round(x / 10000, 2))
    margin_detail['rqye'] = margin_detail['rqye'].apply(lambda x: round(x / 10000, 2))
    margin_detail.fillna(0, inplace=True)
    margin_detail = margin_detail[['trade_date', 'stk_code', 'stk_name', 'rzye', 'rqye']]

    print(margin_detail.head(5))

    margin_detail_data = []
    for index, row in margin_detail.iterrows():
        margin_detail_data.append((row['trade_date'], row['stk_code'], row['stk_name'], row['rzye'], row['rqye']))

    table_insert(margin_detail, 'margin_detail', margin_detail_data)


# -------------------------------------------------------------------------------------------------------------
# tushare:股东人数

# @retry(tries=-1, delay=5)
def holder_number():
    print('holder_number函数开始执行')
    stock_list = pro.stock_basic(list_status='L', fields='ts_code,symbol,name')
    code_list = stock_list['ts_code'].tolist()

    holder_num = pd.DataFrame()
    for offset in range(0, 60000, 6000):
        number = pro.stk_holdernumber(start_date=current, end_date=current, fields='ts_code,end_date,holder_num', offset=offset)
        holder_num = holder_num.append(number, ignore_index=True)
        time.sleep(1)

    holder_num = holder_num.rename(columns={'ts_code': 'stk_code'})
    holder_num.fillna(0, inplace=True)
    holder_num['stk_code'] = holder_num['stk_code'].apply(lambda x: x[:6])
    holder_num = holder_num[['end_date', 'stk_code', 'holder_num']]

    print(holder_num.head(5))

    holder_num_data = []
    for index, row in holder_num.iterrows():
        holder_num_data.append((row['end_date'], row['stk_code'], row['holder_num']))

    table_insert(holder_num, 'holder_number', holder_num_data)


# -------------------------------------------------------------------------------------------------------------
# 东方财富:个股盘口异动数据

def eastmoney_yd_spider(current):
    print('eastmoney_yd_spider函数开始执行')

    # 1、构造param
    param_list = []
    for page in range(0, 600):
        param = {"pageindex": page, "pagesize": '64', "ut": '7eea3edcaed734bea9cbfc24409ed989', "dpt": 'wzchanges'}
        param_list.append(param)

    # 2、抓取网页数据
    content_list = []
    header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (Khtml, like Gecko) Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3676.400 QQBrowser/10.5.3738.400"}
    url = "http://push2ex.eastmoney.com/getAllStockChanges?type=8201,8202,8193,4,32,64,8207,8209,8211,8213,8215,8204,8203,8194,8,16,128,8208,8210,8212,8214,8216"
    for param in param_list:
        content = json.loads(session.get(url=url, params=param, headers=header).text)

        if content['data'] is None:
            break
        else:
            content_list.append(content)

    # 3、网页数据解析
    content_list_data = []
    for content in content_list:
        allstock = content['data']['allstock']
        for stock in allstock:
            stk_code = stock['c']  # 股票代码
            stk_name = stock['n']  # 股票名称
            chg_time = stock['tm']  # 异动时间
            chg_type = stock['t']  # 异动类型
            if chg_type in ('8201', '8202', '8207', '8209', '8211', '8215', '8204', '8203', '8208', '8210', '8212', '8216'):
                chg_value = stock['i'] * 100  # 异动值
            elif chg_type in ('8193', '8194', '128', '64'):
                chg_value = stock['i'] / 10000  # 异动值
            else:
                chg_value = stock['i']  # 异动值

                content_list_data.append((current, stk_code, stk_name, chg_time, chg_type, chg_value))

    # 4、网页数据存储
    conn = pymysql.connect(host='localhost', user='root', passwd='123456', db='quant', charset='utf8mb4')
    cursor = conn.cursor()
    try:
        content_list_sql = '''insert ignore into yd_detail(trade_date,stk_code,stk_name,chg_time,chg_type,chg_value) values(%s,%s,%s,%s,%s,%s)'''
        cursor.executemany(content_list_sql, content_list_data)
    except Exception as error:
        print('insert failed')
        print(error)
        conn.rollback()
    else:
        print('insert successful!')
        conn.commit()
    cursor.close()
    conn.close()


# -------------------------------------------------------------------------------------------------------------
# 开盘啦:龙虎榜风口概念数据

def fengkouKpl(current):
    print('fengkouKpl函数开始执行')
    current = current[:4] + '-' + current[4:6] + '-' + current[6:]
    print(current)
    # 1、抓取网页数据
    url = 'https://pclhb.kaipanla.com/w1/api/index.php'
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0'}
    data = {'c': 'LongHuBang', 'a': 'GetStockList', 'st': 300, 'Time': str(current), 'UserID': 399083, 'Token': '71aef0e806e61ad3169ddc9473e37886'}

    html = json.loads(session.post(url=url, headers=headers, data=data).text)['list']
    s = requests.session()  # 参考文献https://www.jianshu.com/p/15b63ab310db
    s.keep_alive = False

    # 2、网页数据解析
    code_list = []
    fengkou_list = []

    for j in html:
        stk_code = j['ID']
        stk_name = j['Name']

        if len(j['FengKou']) == 0:
            fengkou = ''
        else:
            p = j['FengKou'].split(',')
            for fengkou in p:
                fengkou = fengkou

        code_list.append(stk_code)
        fengkou_list.append((current, stk_code, stk_name, fengkou))
        print(fengkou_list)

    # 3、网页数据存储
    conn = conn = pymysql.connect(host='localhost', user='root', passwd='123456', db='quant', charset='utf8mb4')
    cursor = conn.cursor()
    try:
        fengkou_list_sql = '''insert ignore into fengkou_concept(trade_date, stk_code, stk_name, fengkou) values( % s, % s, % s, % s)'''
        cursor.executemany(fengkou_list_sql, fengkou_list)
    except Exception as error:
        print('insert failed')
        print(error)
        conn.rollback()
    else:
        print('insert successful!')
        conn.commit()
    cursor.close()
    conn.close()

    return code_list


# -------------------------------------------------------------------------------------------------------------
# 开盘啦:龙虎榜营业部标签数据

def departKpl(current, code_list):
    print('departKpl函数开始执行')
    current = current[:4] + '-' + current[4:6] + '-' + current[6:]
    print(current)
    # 1、抓取网页数据
    html_list = []
    url = 'https://pclhb.kaipanla.com/w1/api/index.php'
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0'}
    for code in code_list:
        data = {'c': 'Stock', 'a': 'GetNewOneStockInfo', 'StockID': code, 'Time': current, 'UserID': '399083', 'Token': '71aef0e806e61ad3169ddc9473e37886'}
        html = json.loads(session.post(url=url, headers=headers, data=data).text)['List'][0]
        list = html['BuyList'] + html['SellList']
        s = requests.session()
        s.keep_alive = False

        html_list.append(list)

        # 2、网页数据解析
        depart_list = []
        for depart in list:
            depart_name = depart['Name']
            try:
                if depart_name == '沪股通专用':
                    tag = '沪股通'
                    classify = 5
                elif depart_name == '深股通专用':
                    tag = '深股通'
                    classify = 6
                elif depart_name == '机构专用':
                    tag = '机构'
                    classify = 4
                else:
                    tag = depart['GroupIcon'][0]
                    classify = depart['YouZiIcon']
            except:
                tag = ''
                classify = depart['YouZiIcon']

            depart_list.append((current, depart_name, tag, classify))

        new_depart_list = set(depart_list)

        # 3、网页数据存储
        conn = pymysql.connect(host='localhost', user='root', passwd='123456', db='quant', charset='utf8mb4')
        cursor = conn.cursor()
        try:
            new_depart_list_sql = '''insert ignore into depart_detail(trade_date, depart_name, tag, classify) values( % s, % s, % s, % s)'''
            cursor.executemany(new_depart_list_sql, new_depart_list)
        except Exception as error:
            print('insert failed')
            print(error)
            conn.rollback()
        else:
            print('insert successful!')
            conn.commit()
        cursor.close()
        conn.close()


# -------------------------------------------------------------------------------------------------------------
# 开盘啦:精选概念

# @retry(tries=-1, delay=5)
def kpl_select_concept_spider(code, current):
    print('kpl_select_concept_spider函数开始执行')
    current = current[:4] + '-' + current[4:6] + '-' + current[6:]

    # 1、网页数据抓取
    url = 'https://pchq.kaipanla.com/w1/api/index.php'
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0'}
    data = {'c': 'PCArrangeData',
            'a': 'GetHQPlate',
            'StockID': code[:6],
            'Day': current,
            'time': '15:00',
            'SelType': '1, 2, 3, 8, 9, 5, 6, 7',
            'UserID': 399083,
            'Token': '71aef0e806e61ad3169ddc9473e37886'}

    session = requests.Session()
    html = session.post(url=url, data=data, headers=headers).text
    time.sleep(1)

    # 2、网页数据解析
    tag, select_concept = [], []

    stk_code = json.loads(html)['trend']['code']
    stk_name = json.loads(html)['pankou']['name']
    stock_tag = json.loads(html)['pankou']['tag']
    if stock_tag is False:
        pass
    else:
        tag.append((stk_code, stk_name, stock_tag))

    cp_list = json.loads(html)['stockplate']
    for cp in cp_list:
        concept = cp[0]
        select_concept.append((stk_code, stk_name, concept))

    # 3、网页数据存储
    conn = conn = pymysql.connect(host='localhost', user='root', passwd='123456', db='quant', charset='utf8mb4')
    cursor = conn.cursor()
    try:
        tag_sql = '''insert ignore into stock_tag(stk_code,stk_name,stock_tag) values(%s,%s,%s)'''
        cursor.executemany(tag_sql, tag)

        select_concept_sql = '''insert ignore into concept_info(stk_code,stk_name,concept) values(%s,%s,%s)'''
        cursor.executemany(select_concept_sql, select_concept)
    except Exception as error:
        print('insert failed')
        print(error)
        conn.rollback()
    else:
        print('insert successful')
        conn.commit()
    cursor.close()
    conn.close()


# -------------------------------------------------------------------------------------------------------------
# 网易:长期阶段涨幅数据

def wangyi_jdzf():
    print('wangyi_jdzf函数开始执行')
    # 1、网页数据抓取
    html_list = []
    url = 'http://quotes.money.163.com/hs/realtimedata/service/rank.php?'
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0'}

    d = datetime.datetime.now()
    req = str(d.weekday()) + str(d.hour) + str(d.minute)

    for page in range(1, 1000):
        param = {
            'host': '/hs/realtimedata/service/rank.php',
            'page': page,
            'query': 'LONG_PERIOD_RANK:_exists_',
            'fields': 'RN,CODE,SYMBOL,NAME,PRICE,LONG_PERIOD_RANK,PERCENT',
            'sort': 'LONG_PERIOD_RANK.MONTH_PERCENT',
            'order': 'desc',
            'count': 25,
            'type': 'query',
            'callback': 'callback_51322721',
            'req': req
        }

        str1 = 'callback_51322721('
        str2 = ']})'

        html = session.get(url=url, headers=headers, params=param).text
        html = html.replace(str1, '')
        html = html.replace(str2, ']}')

        html = (json.loads(html))['list']
        if len(html) == 0:
            break
        else:
            html_list.append(html)

    # 2、网页数据解析
    jdzf_list = []
    for p in html_list:
        for j in p:
            print(j)
            stk_code = j['SYMBOL']
            stk_name = j['NAME']
            cur_close = j['PRICE']
            pct_chg = j['PERCENT'] * 100
            print(pct_chg)

            try:
                w_pct_chg = round(j['LONG_PERIOD_RANK']['WEEK_PERCENT'] * 100, 2)
            except Exception as error:
                w_pct_chg = 0

            try:
                m_pct_chg = round(j['LONG_PERIOD_RANK']['MONTH_PERCENT'] * 100, 2)
            except Exception as error:
                m_pct_chg = 0

            try:
                q_pct_chg = round(j['LONG_PERIOD_RANK']['QUARTER_PERCENT'] * 100, 2)
            except Exception as error:
                q_pct_chg = 0

            try:
                hy_pct_chg = round(j['LONG_PERIOD_RANK']['HALF_YEAR_PERCENT'] * 100, 2)
            except Exception as error:
                hy_pct_chg = 0

            try:
                y_pct_chg = round(j['LONG_PERIOD_RANK']['YEAR_PERCENT'] * 100, 2)
            except Exception as error:
                y_pct_chg = 0

            jdzf_list.append((stk_code, stk_name, cur_close, pct_chg, w_pct_chg, m_pct_chg, q_pct_chg, hy_pct_chg, y_pct_chg))

    # 3、网页数据存储
    conn = pymysql.connect(host='localhost', user='root', passwd='123456', db='quant', charset='utf8mb4')
    cursor = conn.cursor()
    try:
        cursor.execute('''delete from stage_increase''')

        jdzf_list_sql = '''insert ignore into stage_increase(stk_code,stk_name,cur_close,pct_chg,w_pct_chg,m_pct_chg,q_pct_chg,hy_pct_chg,y_pct_chg) values(%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
        cursor.executemany(jdzf_list_sql, jdzf_list)
    except Exception as error:
        print('insert failed')
        print(error)
        conn.rollback()
    else:
        print('insert successful')
        conn.commit()
    cursor.close()
    conn.close()


# -------------------------------------------------------------------------------------------------------------
# 网易:连续上涨数据

def wangyi_lxsz():
    print('wangyi_lxsz函数开始执行')
    # 1、网页数据抓取
    html_list = []
    url = 'http://quotes.money.163.com/hs/realtimedata/service/marketIndexes.php?'
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0'}

    d = datetime.datetime.now()
    req = str(d.weekday()) + str(d.hour) + str(d.minute)

    for page in range(1, 1000):
        param = {
            'host': '/hs/realtimedata/service/marketIndexes.php',
            'page': page,
            'query': 'CONTINUOUS_UPDOWN.RIGHT_NOW:int_1',
            'fields': 'CONTINUOUS_UPDOWN,CODE,RN,SYMBOL,NAME,TYPE,PRICE,PERCENT',
            'sort': 'CONTINUOUS_UPDOWN.DAYS',
            'order': 'desc',
            'count': 25,
            'type': 'query',
            'callback': 'callback_1342448884',
            'req': req
        }

        str1 = 'callback_1342448884('
        str2 = ']});'

        html = session.get(url=url, headers=headers, params=param).text
        html = html.replace(str1, '')
        html = html.replace(str2, ']}')

        html = (json.loads(html))['list']
        if len(html) == 0:
            break
        else:
            html_list.append(html)

    # 2、网页数据解析
    lx_up_days_data = []
    for p in html_list:
        for j in p:
            stk_code = j['SYMBOL']
            stk_name = j['NAME']
            cur_close = j['PRICE']
            pct_chg = j['PERCENT']
            up_days = j['CONTINUOUS_UPDOWN']['DAYS']
            lx_up_days_data.append((current, stk_code, stk_name, cur_close, pct_chg, up_days))

    # 3、网页数据存储
    conn = pymysql.connect(host='localhost', user='root', passwd='123456', db='quant', charset='utf8mb4')
    cursor = conn.cursor()
    try:
        lx_up_days_sql = '''insert ignore into continuous_up_day(trade_date,stk_code,stk_name,cur_close,pct_chg,up_days) values(%s,%s,%s,%s,%s,%s)'''
        cursor.executemany(lx_up_days_sql, lx_up_days_data)
    except Exception as error:
        print('insert failed')
        print(error)
        conn.rollback()
    else:
        print('insert successful')
        conn.commit()
    cursor.close()
    conn.close()


# -------------------------------------------------------------------------------------------------------------
# 网易:连续下跌数据

def wangyi_lxxd():
    print('wangyi_lxxd函数开始执行')
    # 1、网页数据抓取
    html_list = []
    url = 'http://quotes.money.163.com/hs/realtimedata/service/marketIndexes.php?'
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0'}

    d = datetime.datetime.now()
    req = str(d.weekday()) + str(d.hour) + str(d.minute)

    for page in range(1, 1000):
        param = {
            'host': '/hs/realtimedata/service/marketIndexes.php',
            'page': page,
            'query': 'CONTINUOUS_UPDOWN.RIGHT_NOW:int_-1',
            'fields': 'CONTINUOUS_UPDOWN,CODE,RN,SYMBOL,NAME,TYPE,PRICE,PERCENT',
            'sort': 'CONTINUOUS_UPDOWN.DAYS',
            'order': 'desc',
            'count': 25,
            'type': 'query',
            'callback': 'callback_1536277048',
            'req': req
        }

        str1 = 'callback_1536277048('
        str2 = ']});'

        html = session.get(url=url, headers=headers, params=param).text
        html = html.replace(str1, '')
        html = html.replace(str2, ']}')

        html = (json.loads(html))['list']
        if len(html) == 0:
            break
        else:
            html_list.append(html)

    # 2、网页数据解析
    lx_down_days_data = []
    for p in html_list:
        for j in p:
            stk_code = j['SYMBOL']
            stk_name = j['NAME']
            cur_close = j['PRICE']
            pct_chg = j['PERCENT']
            down_days = j['CONTINUOUS_UPDOWN']['DAYS']
            lx_down_days_data.append((current, stk_code, stk_name, cur_close, pct_chg, down_days))

    # 3、网页数据存储
    conn = pymysql.connect(host='localhost', user='root', passwd='123456', db='quant', charset='utf8mb4')
    cursor = conn.cursor()
    try:
        lx_down_days_sql = '''insert ignore into continuous_down_day(trade_date,stk_code,stk_name,cur_close,pct_chg,down_days) values(%s,%s,%s,%s,%s,%s)'''
        cursor.executemany(lx_down_days_sql, lx_down_days_data)
    except Exception as error:
        print('insert failed')
        print(error)
        conn.rollback()
    else:
        print('insert successful')
        conn.commit()
    cursor.close()
    conn.close()


# -------------------------------------------------------------------------------------------------------------
# jquant:竞价明细

# @retry(tries=-1, delay=5)
def stock_bidding(current):
    print('stock_bidding函数开始执行')
    current = current[:4] + '-' + current[4:6] + '-' + current[6:]
    stock_list = (get_all_securities(types=['stock'], date=None).index).tolist()

    # 1、接口数据请求
    stock_bidding = pd.DataFrame()
    for stk in stock_list:
        data_1 = get_call_auction(security=stk, start_date=current, end_date=current, fields=['time', 'current', 'money'])
        stock_bidding = stock_bidding.append(data_1, ignore_index=True)

    stock_bidding = stock_bidding.rename(columns={'code': 'stk_code', 'time': 'trade_date', 'current': 'open_price', 'money': 'amount'})
    stock_bidding = stock_bidding[['trade_date', 'stk_code', 'open_price', 'amount']]
    stock_bidding.fillna(0, inplace=True)
    stock_bidding['trade_date'] = stock_bidding['trade_date'].apply(lambda x: (str(x).replace('-', ''))[:8])
    stock_bidding['stk_code'] = stock_bidding['stk_code'].apply(lambda x: x[:6])
    stock_bidding['amount'] = stock_bidding['amount'].apply(lambda x: round(x / 10000, 2))
    stock_bidding = [['trade_date', 'stk_code', 'open_price', 'amount']]

    print(stock_bidding.head(5))

    # 2、接口数据解析
    stock_bidding_data = []
    for index, row in stock_bidding.iterrows():
        stock_bidding_data.append((row['trade_date'], row['stk_code'], row['open_price'], row['amount']))

    table_insert(stock_bidding, 'stock_bidding', stock_bidding_data)


# -------------------------------------------------------------------------------------------------------------
# jquant:龙虎榜明细

# @retry(tries=-1, delay=5)
def top_inst_info(current):
    print('top_inst_info函数开始执行')
    current = current[:4] + '-' + current[4:6] + '-' + current[6:]

    # 1、接口数据请求
    top_inst = get_billboard_list(stock_list=None, start_date=current, end_date=current)
    top_inst = top_inst[['day', 'code', 'direction', 'sales_depart_name', 'rank', 'buy_value', 'sell_value', 'net_value', 'amount']]
    top_inst = top_inst.rename(columns={'code': 'ts_code', 'day': 'trade_date', 'sales_depart_name': 'depart_name'})
    top_inst['ts_code'] = top_inst['ts_code'].apply(lambda x: x[:6])
    top_inst['depart_name'] = top_inst['depart_name'].fillna(value='')
    top_inst['depart_name'] = top_inst['depart_name'].apply(lambda x: x.replace('证券营业部', ''))
    top_inst['depart_name'] = top_inst['depart_name'].apply(lambda x: x.replace('股份有限公司', ''))
    top_inst['depart_name'] = top_inst['depart_name'].apply(lambda x: x.replace('有限责任公司', ''))
    top_inst['depart_name'] = top_inst['depart_name'].apply(lambda x: x.replace('有限公司', ''))
    top_inst['buy_value'] = top_inst['buy_value'].fillna(value=0)
    top_inst['buy_value'] = top_inst['buy_value'].apply(lambda x: round(x / 10000, 1))
    top_inst['sell_value'] = top_inst['sell_value'].fillna(value=0)
    top_inst['sell_value'] = top_inst['sell_value'].apply(lambda x: round(x / 10000, 1))
    top_inst['net_value'] = top_inst['net_value'].fillna(value=0)
    top_inst['net_value'] = top_inst['net_value'].apply(lambda x: round(x / 10000, 1))
    top_inst['amount'] = top_inst['amount'].fillna(value=0)
    top_inst['amount'] = top_inst['amount'].apply(lambda x: round(x / 10000, 1))
    top_inst = top_inst[['trade_date', 'ts_code', 'direction', 'depart_name', 'rank', 'buy_value', 'sell_value', 'net_value', 'amount']]

    print(top_inst.head(5))

    # 2、接口数据解析
    top_inst_data = []
    for index, row in top_inst.iterrows():
        top_inst_data.append((row['trade_date'], row['ts_code'], row['direction'], row['depart_name'], row['rank'], row['buy_value'], row['sell_value'], row['net_value'], row['amount']))

    table_insert(top_inst, 'top_inst_detail', top_inst_data)


# -------------------------------------------------------------------------------------------------------------
# 数据库数据清洗
def data_clear():
    pass

    print('data_clear函数开始执行')


# -------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':
    pro = ts.pro_api('ac16b470869c5d82db5033ae9288f77b282d2b5519507d6d2c72fdd7')

    auth('18829345691', '345691')
    is_auth = is_auth()

    t1 = time.time()

    current = time.strftime('%Y%m%d', time.localtime())
    current = '20200402'
    list = ['20200206', '20200207', '20200210', '20200211', '20200212', '20200213', '20200214', '20200217', '20200218', '20200219', '20200220', '20200221', '20200224', '20200225', '20200226', '20200227', '20200228', '20200302', '20200303', '20200304', '20200305', '20200306', '20200309', '20200310', '20200311', '20200312', '20200313', '20200316', '20200317', '20200318',
            '20200319', '20200320', '20200323', '20200324', '20200325', '20200326', '20200327', '20200330', '20200331']
    for current in list:
        # tushare接口
        stock_list = stock_list()
        cept_id = concept_classify()
        sz_index_info()
        on_shibor_rate()
        hsgt_total()
        hsgt_top_detail()
        stock_daily = stock_daily()
        scqx_detail(stock_daily)
        big_moneyflow()
        up_down_limit_detail()
        margin_info()
        margin_detail()

        session = requests.Session()

        # 东财爬虫
        eastmoney_yd_spider()

        # 开盘啦爬虫
        code_list = fengkouKpl(current)
        departKpl(current, code_list)

        conn = Redis(host='127.0.0.1', port=6379)
        pool = Pool(8)
        for code in stock_list:
            ex = conn.sadd('code', code)
            if ex == 1:
                print('数据没有被爬取过,可以进行数据的爬取')
                try:
                    pool.apply_async(func=kpl_select_concept_spider, args=(code, current))
                except Exception as error:
                    print(error)
            else:
                print('数据还没有更新,暂无新数据可爬取!')

        pool.close()
        pool.join()

        # 网易爬虫
        wangyi_jdzf()
        wangyi_lxsz()
        wangyi_lxxd()

        # 聚宽接口
        stock_bidding(current)
        top_inst_info(current)

    # tushare接口,耗时较长
    holder_number()
    concept_detail(cept_id)

    t2 = time.time()

    print('本次共耗费%.2f' % (t2 - t1))
原文地址:https://www.cnblogs.com/Iceredtea/p/12300728.html