python对文件进行并行计算初探

最近工作中经常会有读取一个文件,对数据做相关处理并写入到另外一个文件的需求

当文件行数较少的时候,单进程顺序读取是没问题的,但是当文件行数过万,就需要消耗很客观的时间。

一、一次性读入,多进程处理

我最初想到的办法是多进程,最初的办法是一次性读取所有行,然后分配给多个进程处理,最终还是写入一个文件。其中需要借助Queue来实现对异常的捕获和处理,不具有可扩展性。

同时一次性读取一个文件,写入内存也受到内存大小的限制。而且这种多进程情况下返回值的处理也比较麻烦。

代码见python并发——多进程中的异常捕获

二、多次读入,并行处理

考虑到Linux有一个按行分割文件的功能split,可以借助她实现数据并行计算,思路是这样的,通过计算文件的总行数,将文件分割成行数相等的多个小文件,小文件个数可以大于或等于并发度。

开启多进程对每个小文件分别处理,每个小文件处理完都输出到一一对应的目标小文件,最终将目标小文件进行合并。

代码如下:

from multiprocessing import Pool
import json
from time import sleep
import requests
import os

SRC_MID = '_src_'
DST_MID = '_dst_'


# 业务逻辑处理
def get_jw(addr_name):
    url = 'https://restapi.amap.com/v3/geocode/geo?address={addr_name}&output=JSON&key=2f114ef951411f01c24a6384b59307a8'
    result = requests.get(url.format(addr_name=addr_name))
    result_str = str(result.content, encoding="utf-8")
    rj = json.loads(result_str)
    if len(rj['geocodes']) > 0:
        jwd = rj['geocodes'][0]['location']
        print(jwd)
        return addr_name + ',' + jwd + '
'
    else:
        print('-,-')
        return addr_name + ',' + '-,-' + '
'


# 输入源文件,返回分割后的源中间文件list
class ParallelCompute(object):
    def __init__(self, exe_func, source_file, target_file, concurrency=8):
        self.exe_func = exe_func
        self.source_file = source_file
        self.target_file = target_file
        self.concurrency = concurrency
        self.abs_src_mid_dir = None
        self.abs_dst_mid_dir = None
        self.src_mid_file_list = None
        self.dst_mid_file_list = None

    # 源文件分割成多个小文件
    def split_file(self):
        cur_path = os.path.abspath('.')
        self.abs_src_mid_dir = os.path.join(cur_path, SRC_MID)
        self.abs_dst_mid_dir = os.path.join(cur_path, DST_MID)
        os.mkdir(self.abs_src_mid_dir)
        os.mkdir(self.abs_dst_mid_dir)
        split_cmd = "split {src_file} {abs_src_mid}/".format(src_file=self.source_file,
                                                             abs_src_mid=self.abs_src_mid_dir)
        print(split_cmd)
        os.system(split_cmd)
        self.src_mid_file_list = [os.path.join(self.abs_src_mid_dir, it) for it in os.listdir(self.abs_src_mid_dir)]
        self.dst_mid_file_list = [src_file.replace(SRC_MID, DST_MID) for src_file in self.src_mid_file_list]

    # 小文件处理
    def translate_file(self, src_file, dst_file):
        with open(src_file, 'rb') as f1, open(dst_file, 'a') as f2:
            line = f1.readline().strip()
            line = str(line, encoding='utf8')
            while line:
                try:
                    jw = self.exe_func(line)
                    f2.write(jw)
                except Exception:
                    sleep(5)
                    offset = len(line.encode('utf8')) + 1
                    f1.seek(-offset, 1)
                line = f1.readline().strip()
                line = str(line, encoding='utf8')

    # 小文件合并
    def merge_files(self):
        with open(self.target_file, 'a') as f2:
            for dst_m_file in self.dst_mid_file_list:
                with open(dst_m_file, 'r') as f1:
                    line = f1.readline()
                    while line:
                        f2.write(line)
                        line = f1.readline()

    # 清理中间文件
    def delete_mid_dir(self):
        os.system('rm -rf %s' % self.abs_src_mid_dir)
        os.system('rm -rf %s' % self.abs_dst_mid_dir)

    def execute(self):
        p = Pool(self.concurrency)
        self.split_file()
        for src_mid_file in self.src_mid_file_list:
            dst_mid_file = src_mid_file.replace(SRC_MID, DST_MID)
            p.apply_async(self.translate_file, args=(src_mid_file, dst_mid_file,))
        p.close()
        p.join()
        self.merge_files()
        self.delete_mid_dir()


if __name__ == '__main__':
    source_file = '/opt/test/qiuxue/target.txt'
    target_file = '/opt/test/qiuxue/result3.txt'
    pc = ParallelCompute(get_jw, source_file, target_file)
    pc.execute()

这样就做到了并行计算和业务逻辑的分离,简化了调用者的使用难度

原文地址:https://www.cnblogs.com/wangbin2188/p/14083263.html