do_sings_user_threading.py

#!/usr/bin/env python
from struct import *
import pandas as pd
import numpy as np
import os
import re
import pathlib

import threading
import time




def get_new_data():
    #pathdir = "G:\datas of status\python codes\data of every months"
    pathdir = "G:\datas of status\python codes\everyday_data"#新数据原文件存放位置
    files = np.array(os.listdir(pathdir))

    #file_names = np.char.add(pathdir + "\", files)
    file_names =[pathdir+"\"+f for f in files if re.search('.csv$',f)]
    df_all = pd.DataFrame()
    for file in file_names:
        print(file)
        df1 = pd.read_csv(file)
        df_all = df_all.append(df1)


    #print(df_all)
    df_all["name"] = df_all["name"].str.replace("SZ","0_")
    df_all["name"] = df_all["name"].str.replace("SH","1_")
    df_all["name"] = df_all["name"] + ".dat"
    df_all["begin"] = df_all["name"].map(str) + df_all["date"].map(str)
    #print(df_all.columns.values)
    return df_all





def read_file_out(file):
    f3 = open(file,mode='rb')
    buf = f3.read()
    #print(buf)
    num=len(buf)
    #print(num)
    no = num / 8
    b = 0
    e = 8
    items = list()
    df_all = pd.DataFrame()
    for i in range(int(no)):
        date = unpack("I",buf[b:e-4])
        con = unpack("f",buf[b+4:e])
        data = {"date":date,
                'content':con}
        df1 = pd.DataFrame(data)
        #print(date)
        #print(data)
        #print(df1)
        df_all = df_all.append(df1)
        #print(a)
        #print(c)
        b = b + 8
        e = e + 8
    df_all = df_all.reset_index(drop=True)
    return df_all


def chang_to_bin(filew,df_all):
    #fw = open(filew, mode='wb')#重写模式
    fw = open(filew, mode='ab')#追加模式

    for i in range(len(df_all)):
        #print(df_all.loc[i,"date"])
        #print(type(df_all.loc[i,"date"]))
        fw.write(pack("I",int(df_all.loc[i,"date"])))
        fw.write(pack("f",df_all.loc[i,"content"]))
    fw.close()


dir_dict = {
    "z_da" : "signals_user_43",
    "z_te_da" : "signals_user_44",
    "f_da" : "signals_user_47",
    "f_te_da" : "signals_user_48"
    }

dir_dict2 = {
    "z_xiao" : "signals_user_41",
    "z_zhong" : "signals_user_42",
    "z_da" : "signals_user_43",
    "z_te_da" : "signals_user_44",
    "f_xiao" : "signals_user_45",
    "f_zhong" : "signals_user_46",
    "f_da" : "signals_user_47",
    "f_te_da" : "signals_user_48"
    }




def dowork(df_only_name):
    for name in df_only_name:#控件股票数df_only_name[13:15]
        #print(name)
        df_t_name = df_new_data[df_new_data["name"] == name]
        for key in dir_dict.keys():
            #if key == "f_zhong":
                #print(key)
                pathdir_key = pathdir + "\" + dir_dict[key]
                file_t_name = pathdir_key + "\" + name
                df_t_name_key =pd.DataFrame(df_t_name,columns = ["date",key])
                df_t_name_key["content"] = df_t_name_key[key]
                df_t_name_key = df_t_name_key.drop(columns=key)
                df_t_name_key = df_t_name_key.drop_duplicates(subset=["date"], keep="last", inplace=False)
                #print(df_t_name_key)
                if not os.path.isfile(file_t_name):
                    pathlib.Path(file_t_name).touch()#创建空文件

                '''#以下为与原数据混合处理,同时要修改chang_to_bin里的读文件模式配合使用
                df_old = read_file_out(file_t_name)
                df_old = df_old.drop_duplicates(subset=["date"], keep="last", inplace=False)
                #print(df_old)
                df_t_name_key = df_old.append(df_t_name_key,sort = False)
                df_t_name_key = df_t_name_key.drop_duplicates(subset = ["date"],keep = "last",inplace=False)
                '''


                df_t_name_key = df_t_name_key.reset_index(drop =  True)
               #print(df_t_name_key)

                chang_to_bin(file_t_name, df_t_name_key)


def run(df_only_name1, semaphore):
    semaphore.acquire()   #加锁
    dowork(df_only_name1)
    #time.sleep(1)
    #print("run the thread:%s
" % n)




    semaphore.release()     #释放

if __name__ == '__main__':

    pathdir = "F:\python\untitled1\core\do_sings_user"  # 更新后数据存放位置
    # pathdir = "F:\通达信\New tdx vip2020.03开心果整合版红顶栏\New tdx vip2020.03开心果整合版\T0002\signals"
    df_new_data = get_new_data()
    df_only_name = df_new_data["name"]
    # df_only = df_only.drop_duplicates(subset=['name'],keep='last',inplace=False)
    df_only_name = df_only_name.drop_duplicates(keep='last', inplace=False)
    print(len(df_only_name))
    all_nums = len(df_only_name)

    every_batch = 13
    epochs = int(all_nums / every_batch)
    print(epochs)
    num_of_thread = 303
    #num = 1
    semaphore = threading.BoundedSemaphore(num_of_thread)  # 最多允许5个线程同时运行
    print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
    for i in range(epochs+1):
        begin = i*every_batch
        end = begin +every_batch

        if all_nums<=end:
            end = all_nums - 1
        df_only_name1 = df_only_name[begin:end]
        t = threading.Thread(target=run, args=(df_only_name1, semaphore))
        t.start()
        #print(i)
        print(i,begin,end)
    print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))

    while threading.active_count() != 1:
        print(threading.active_count())

        time.sleep(10)
        pass  # print threading.active_count()
    else:
        print('-----all threads done-----')
        print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
原文地址:https://www.cnblogs.com/rongye/p/13196067.html