利用多线程对大数组进行处理

一、场景

在进行模型训练的过程中,会对一些大数据资源进行处理。比如对一个(80000,35)进行数据的替换和选择

二、涉及内容

对上述场景进行处理,采取的想法是分解数组,处理完在合并,需要做到以下的要求:

1. 数据顺序正确
2. 高维数组不能过长迭代(多个for嵌套)

因此需要涉及到进程池,数组迭代,数组拼接等内容处理流程如下:

三、实现

例子:将一个(10,20)维数组里面大于20的值变为0。

3.1 数组拆解

使用numpy提供的数组分割的方法实现分割

import numpy as np
# 第一个是需要分解的数组,第二个是分解的个数
# 注意如果是按列堆叠,元素必须保持一致,因此要尽可能的等分
grey_scale_chunks = np.array_split(data_array, 5)

3.2 迭代处理

迭代是进行元素的获取,如果要修改可以使用枚举

# 使用nditer遍历数组,设置读写操作,默认是只读
arr = np.array([1, 2, 3])

# op_dtypes:操作类型
for x in np.nditer(a, op_flags=['readwrite']):
    if x > 20:
        x[...] = 0
print(a)

# 使用枚举的方法
for idx, i in np.ndenumerate(binary_array):
    if i > 20:
        binary_array[idx] = i

3.3 多线程处理

使用线程池实现处理

# number_of_threads设置进程数,可以使用os.getcpu()获取cpu信息。
pool = ThreadPool(processes=number_of_threads)
results = pool.map_async(perform_calc, grey_scale_chunks)
pool.close()
pool.join()  # Block until all threads exit.
results.get()  # 获取处理的结果

3.4 合并数组

# 使用numpy提供的方法进行数组的堆叠
# numpy提供了三种堆叠的方式:
# 沿行的堆叠方式(hstack):将每行数据进行拼接,不要求堆叠的数据同型。
# 沿列的堆叠方式(vstack):将每行数据按着竖直方向进行堆叠,要求数据同型。
# 沿深度堆叠的方式(dstack):每次选取第i列的所有数据组成一行,直到遍历完所有列。要求数据同型。

# 根据需求按照沿列的方式进行堆叠。
np_re = np.vstack(results.get())

3.5 总代码

将所有的内容融合在一起:

from multiprocessing.pool import ThreadPool
import numpy as np
import time
def perform_calc(binary_array):
    # perform some operation
    for idx, i in np.ndenumerate(binary_array):
        if i > 20:
            binary_array[idx] = i
    return binary_array


def main(data_array):
    number_of_threads = 6
    pool = ThreadPool(processes=number_of_threads)
    grey_scale_chunks = np.array_split(data_array, 5)
    start = time.clock()
    results = pool.map_async(perform_calc, grey_scale_chunks)
    pool.close()
    pool.join()  # Block until all threads exit.
    end = time.clock()
    print(end - start)
    print(len(results.get()))
    # Final results will be a list of arrays.
    result1 = np.vstack(results.get())
    print(result1)
    print(result1.shape)


grey_arr = np.random.randint(1, 40, size=(10, 20))
print(grey_arr)
main(grey_arr)

四、总结

在大数据中的处理,这种思想有着比较好的方法,主要的问题就是设计线程间的通讯问题,主要是操作系统的内容,这里做了一个简单的介绍。

参考

w3school数组连接

python线程处理切割数组(涉及死锁处理等问题)

numpy枚举

numpy遍历

原文地址:https://www.cnblogs.com/future-dream/p/14721077.html