python对文件进行并行计算初探(二)

上次的并行计算是通过将大文件分割成小文件,涉及到文件分割,其实更有效的方法是在内存中对文件进行分割,分别计算

最后将返回结果直接写入目标文件,省去了分割小文件合并小文件删除小文件的过程

代码如下:

import json
import math
from multiprocessing import Pool

import requests

"""
不分割文件,直接起多个进程对文件进行读写
apply_async的callback接收的参数是调用行数的返回值,err_callback接收的参数是抛出来的异常
"""


# 用户业务逻辑
def get_jw(addr_name):
    addr_name=addr_name.split(',')[1]
    url = 'https://restapi.amap.com/v3/geocode/geo?address={addr_name}&output=JSON&key=2f114ef951411f01c24a6384b59307a8'
    result = requests.get(url.format(addr_name=addr_name))
    result_str = str(result.content, encoding="utf-8")
    rj = json.loads(result_str)
    if len(rj['geocodes']) > 0:
        jwd = rj['geocodes'][0]['location']
        print(jwd)
        return addr_name + ',' + jwd + '
'
    else:
        print('-,-')
        return addr_name + ',' + '-,-' + '
'


def my_callback(lines):
    with open('/opt/test/qiuxue/target2.txt', 'a') as f:
        f.writelines(lines)


# 读取分块文件
class Reader(object):
    def __init__(self, file_name, start_pos, end_pos, business_func):
        self.file_name = file_name
        self.start_pos = start_pos
        self.end_pos = end_pos
        self.business_func = business_func

    def execute(self):
        lines = []
        with open(self.file_name, 'rb') as f:
            if self.start_pos != 0:
                f.seek(self.start_pos - 1)
                if f.read(1) != '
':
                    line = f.readline()
                    self.start_pos = f.tell()
            f.seek(self.start_pos)
            while self.start_pos < self.end_pos:
                line = f.readline().strip()
                line = str(line, encoding='utf8')
                try:
                    new_line = self.business_func(line)
                    lines.append(new_line)
                except Exception as e:
                    offset = len(line.encode('utf8')) + 1
                    f.seek(-offset, 1)
                self.start_pos = f.tell()
        return ''.join(lines)


# 将文件分成要求的块数,以list返回起止pos
class FileBlock(object):
    def __init__(self, file_name, block_num):
        self.file_name = file_name
        self.block_num = block_num

    def block_file(self):
        pos_list = []
        with open(self.file_name, 'r') as f:
            f.seek(0, 2)
            start_pos = 0
            file_size = f.tell()
            block_size = math.ceil(file_size / self.block_num)
            while start_pos <= file_size:
                if start_pos + block_size > file_size:
                    pos_list.append((start_pos, file_size))
                else:
                    pos_list.append((start_pos, start_pos + block_size))
                start_pos = start_pos + block_size + 1

        return pos_list


if __name__ == '__main__':
    concurrency = 8
    p = Pool(concurrency)
    input_file = '/opt/test/qiuxue/target.txt'
    fb = FileBlock(input_file, concurrency)
    for s, e in fb.block_file():
        reader = Reader(input_file, s, e, get_jw)
        p.apply_async(reader.execute, callback=my_callback)

    p.close()
    p.join()
原文地址:https://www.cnblogs.com/wangbin2188/p/14113305.html