Python的并行求和例子

先上一个例子,这段代码是为了评估一个预测模型写的,详细评价说明在

https://www.kaggle.com/c/how-much-did-it-rain/details/evaluation,

它的核心是要计算

在实际计算过程中,n很大(1126694),以至于单进程直接计算时间消耗巨大(14分10秒),

所以这里参考mapReduce的思想,尝试使用多进程的方式进行计算,即每个进程计算一部分n,最后将结果相加再计算C

代码如下:

import csv
import sys
import logging
import argparse
import numpy as np
import multiprocessing
import time

# configure logging
logger = logging.getLogger("example")

handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(logging.Formatter(
    '%(asctime)s %(levelname)s %(name)s: %(message)s'))

logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

def H(n, z):
    return (n-z) >= 0

def evaluate(args, start, end):
    '''handle range[start, end)'''
    logger.info("Started %d to %d" %(start, end))
    expReader = open('train_exp.csv','r')
    expReader.readline()
    for i in range(start):
        _ = expReader.readline()
    predFile = open(args.predict)
    for i in range(start+1):
        _ = predFile.readline()
    predReader = csv.reader(predFile, delimiter=',')
    squareErrorSum = 0
    totalLines = end - start
    for i, row in enumerate(predReader):
        if i == totalLines:
            logger.info("Completed %d to %d" %(start, end))
            break
        expId, exp = expReader.readline().strip().split(',')
        exp = float(exp)
        predId = row[0]
        row = np.array(row, dtype='float')
        #assert expId == predId
        #lineSum = 0
        for j in xrange(1,71):
            n = j - 1
            squareErrorSum += (row[j]-(n>=exp))**2
            #squareErrorSum += (row[j]-H(n,exp))**2
            #lineSum += (row[j]-H(n,exp))**2
    logger.info('SquareErrorSum %d to %d: %f' %(start, end, squareErrorSum))
    return squareErrorSum

def fileCmp(args):
    '''check number of lines in two files are same'''
    for count, line in enumerate(open('train_exp.csv')):
        pass
    expLines = count + 1 - 1 #discare header
    for count, line in enumerate(open(args.predict)):
        pass
    predictLines = count + 1 - 1
    print 'Lines(exp, predict):', expLines, predictLines
    assert expLines == predictLines
    evaluate.Lines = expLines
    
if __name__ == "__main__":
    # set up logger
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument('--predict', 
                        help=("path to an predict probability file, this will "
                              "predict_changeTimePeriod.csv"))
    args = parser.parse_args()
    fileCmp(args)
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    result = []
    blocks = multiprocessing.cpu_count()
    linesABlock = evaluate.Lines / blocks
    for i in xrange(blocks-1):
        result.append(pool.apply_async(evaluate, (args, i*linesABlock, (i+1)*linesABlock)))
    result.append(pool.apply_async(evaluate, (args, (i+1)*linesABlock, evaluate.Lines+1)))
    pool.close()
    pool.join()
    result = [res.get() for res in result]
    print result
    print 'evaluate.Lines', evaluate.Lines
    score = sum(result) / (70*evaluate.Lines)
    print "score:", score

这里是有几个CPU核心就分成几个进程进行计算,希望尽量榨干CPU的计算能力。实际上运行过程中CPU的占用率也一直是100%

测试后计算结果与单进程一致,计算时间缩短为6分27秒,只快了一倍。

提升没有想象中的大。

经过尝试直接用StringIO将原文件每个进程加载一份到内存在进行处理速度也没有进一步提升,结合CPU的100%占用率考虑看起来是因为计算能力还不够。

看来计算密集密集型的工作还是需要用C来写的:)

C的实现要比python快太多了,单线程只需要50秒就能搞定,详见:

http://www.cnblogs.com/instant7/p/4313649.html

原文地址:https://www.cnblogs.com/instant7/p/4312786.html