aiofiles拆分大文件

import asyncio
import aiofiles
import time
import csv
import os



async def main(out_path, infile, num=1000000):
    """
        :param out_path: 输出文件路径
        :param infile: 输入文件
        :param num: 拆分每个文件的大小
        :return:
        """
    async with aiofiles.open(infile, "r", encoding="utf-8") as fp:
        basename = os.path.basename(infile).split('.')[0]
        start = time.time()
        contents = await fp.readlines()
        count = 0
        csv_list = []

        for line in contents:
            if count == 0:
                head = line
            if count % num == 0 and csv_list:
                file_idx = int(count / num)
                file = f"{out_path}/{basename}_{file_idx}.csv"
                print(len(csv_list))
                if file_idx == 1:
                    async with aiofiles.open(file, "w", encoding="utf-8") as fw:
                        await fw.writelines(csv_list)
                else:
                    async with aiofiles.open(file, "w+", encoding="utf-8") as fw:
                        await fw.write(head)
                        await fw.writelines(csv_list)
                csv_list = []

            csv_list.append(line)

            count += 1
        if csv_list:
            print(len(csv_list))
            file = f"{out_path}/{basename}_{file_idx + 1}.csv"
            async with aiofiles.open(file, "w", encoding="utf-8") as w:
                await w.write(head)
                await w.writelines(csv_list)
        print(f"end1 {time.time() - start}")
        print(count)


def read_csv_sync(out_path, infile, num=1000000):
    """
    :param out_path: 输出文件路径
    :param infile: 输入文件
    :param num: 拆分每个文件的大小
    :return:
    """
    basename = os.path.basename(infile).split('.')[0]
    start = time.time()
    with open(infile, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(e.replace('\0', '') for e in f)
        h = reader.fieldnames
        count = 0
        res_lst = []
        for r in reader:
            if count % num == 0 and res_lst:
                file_idx = int(count / num)
                file = f"{out_path}/{basename}_{file_idx}.csv"
                with open(file, newline='', encoding='utf-8') as fw:
                    write = csv.DictWriter(fw, h)
                    write.writerows(res_lst)
                res_lst = []
        if res_lst:
            print(len(res_lst))
            file = f"{out_path}/{basename}_{file_idx + 1}.csv"
            with open(file, newline='', encoding='utf-8') as w:
                write = csv.DictWriter(w, h)
                write.writerows(res_lst)

        print(f"end2 {time.time() - start}")




if __name__ == '__main__':
    o_path = r'D:\常用保存文件\split'
    o_path2 = r'D:\常用保存文件\split2'
    in_file = r'D:\临时文件\test.csv'
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(o_path, in_file))
    loop.close()
    read_csv_sync(o_path2, in_file)





 

测试文件 3852733 使用异步拆分时间对比

    1000000
 1000000
 1000000
 852733
 end1 5.248189210891724
 3852733
 end2 9.213284492492676

  

原文地址:https://www.cnblogs.com/zzay/p/15701133.html