pandas 生成数据大数据

# coding=utf-8
import pandas as pd
import numpy as np
import uuid
from hashlib import sha256

# batch_size of each time write rows to id_sha256.csv
batch_size = 200000
# total_samples
total_samples = 10000000
# path_id csv
path_id_csv = "./id_sha256.csv"
# gen numeric,if numeric gen int64 to id_sha256.csv,False gen sha256  object of pandas.
numeric = True
# set header "id"
no_header = True


def foo(band):
    for index, v in enumerate(band):
        a, b = v[0], v[1]
        t = [k for k in range(a, b)]
        yield t


def value_sha(a, b):
    t = []
    if numeric:
        for k in range(a, b + 1):
            t.append(k)
    else:
        for i in range(a, b + 1):
            uid = str(uuid.uuid1()).replace("-", "")
            id_value = sha256(uid.encode("utf-8")).hexdigest()  # todo each time is same uid string,so need sha diff it
            t.append(id_value)
        # print(f"{index+1}次 length of sha_list is {len(t)},range is [{a},{b}]")
    return t


def gen_id(batch_size, samples):
    rangers = [[k, k + batch_size] for k in list(range(0, samples, batch_size))]
    generator = foo(rangers)  # <class.generator>
    for index, value in enumerate(generator):
        a, b = value[0], value[-1]
        v = value_sha(a, b)
        if numeric:
            df = pd.DataFrame(np.array(v), columns=["id"], dtype=np.int64)  # todo  set dtype=np.int64
        else:
            df = pd.DataFrame(np.array(v), columns=["id"])  # todo  set dtype=np.str
        if index == 0:
            print(df.dtypes)
            df = pd.DataFrame(np.array(v), columns=["id"])
            if no_header:
                df.to_csv(path_id_csv, index=False, header=None)
            else:
                df.to_csv(path_id_csv, index=False)
        else:
            df.to_csv(path_id_csv, index=False, header=None, mode="a")
        print(
            f"finish {index + 1}x{batch_size} row time write,value index range is [{value[0]},{value[-1]}],length of sha256msg is {len(value)}")


def check_set():
    df = pd.read_csv(path_id_csv)
    array = df.values.tolist()
    mp = list(map(lambda x: x[0], array))
    print(f"set {path_id_csv} sha256 id columns去重后行数:", len(list(set(mp))))


if __name__ == '__main__':
    import time

    start = time.time()
    gen_id(batch_size, total_samples)
    print(time.time() - start)
    print(f"<<<<<<<<<<finish gen {total_samples} rows sha256 id to {path_id_csv}<<<<<<<<<")
    # check_set()

使用sha256或者id range生成id列

gendata out 根据上述产生csv的id 列进行交集大数据

import pandas as pd
import numpy as np

__author__ = 'Chenquan'
# todo before you run generate_output.py,please run shamsg_unique.py to gen id col to csv first for read.
""">>>>10wx1000columns cost 143.43s <<<<< 10wx10columns cost 2.02s"""
# 特征列
col = 10

# generate samples rows numbers,must be the same with id_sha256.csv id rows
totals_row = 100000

# 每次yield分批的写入save_data output数量样本,suggest 2000 or 5000 or 10000 ,
batch_size = 20000

# data_output path for guest or host  data_set
target_path = "./breast_b.csv"

# id_csv path
id_csv_path = "./id_sha256.csv"  # todo id col support numeric and sha256 object type

# with label,生成数据是否带有label
label_switch = True
# data_set id column dtype,$id_csv_path id type is numeric set dtype=np.int64,else dtype=np.object
numeric = True

if batch_size > totals_row:
    raise ValueError(f"batch_size number can't more than samples")


def yield_id():
    data_set = pd.read_csv(id_csv_path, chunksize=batch_size, iterator=True, header=None)
    for it in data_set:
        a = list(map(lambda x: x[0], it.values.tolist()))
        yield a


def concat(with_label):
    ids = yield_id()
    for id_list in ids:  # todo len(id_list)=batch_size
        if numeric:
            id_type = np.int64
        else:
            id_type = None
        df_id = pd.DataFrame(id_list, columns=["id"], dtype=id_type)
        value_a = np.around(np.random.normal(0, 1, (batch_size, col)), decimals=5, out=None)
        df_feature = pd.DataFrame(value_a, columns=[f"x{i}" for i in range(col)])
        if with_label:
            df_y = pd.DataFrame(np.random.choice(2, batch_size), dtype=np.int64, columns=["y"])
            one_iter_data = pd.concat([df_id, df_y, df_feature], axis=1, ignore_index=False)
        else:
            one_iter_data = pd.concat([df_id, df_feature], axis=1, ignore_index=False)
        # print(one_iter_data)
        yield one_iter_data


def save_data(path, with_label):
    """ if with_label true then generate $target_path with label y column """
    one_batch = concat(with_label)
    for index, df_dt in enumerate(one_batch):
        if index == 0:
            print(df_dt.dtypes, "
")
            print(f"header of csv:
{df_dt.columns.values.tolist()}")
            df_dt.to_csv(path, index=False)
        else:
            df_dt.to_csv(path, index=False, mode="a", header=None)


if __name__ == '__main__':
    import time

    start = time.time()
    idsha256 = pd.read_csv(id_csv_path, header=None)
    id_sha256_rows = idsha256.shape[0]
    if totals_row == id_sha256_rows:
        pass
    else:
        raise ValueError(
            f"Sample total rows is {totals_row} must be the same with id_sha256.csv id rows size:{id_sha256_rows}")
    save_data(target_path, with_label=label_switch)
    print(time.time() - start)

  

原文地址:https://www.cnblogs.com/SunshineKimi/p/12470040.html