并发二分法查找算法【非分布式】

  1. 根据node个数,选出数字中node个数的数据进行验证
import time
from multiprocessing import Process
import os

from multiprocessing import Pool
import os, time, random


# 多线程二分法
def midd_search(check_list, check_function):
    if len(check_list) == 0:
        return False
    mid = len(check_list) // 2
    if check_function(check_list[mid]):
        return True, check_list[mid]
        # if len(check_list[mid+1:]) == 0:
        #     return {'last OK': check_list[mid]}
        # return binary_search(check_list[mid + 1:], check_function)
    else:
        return False, check_list[mid]
        # if len(check_list[:mid]) == 0:
        #     return {'first error': check_list[mid]}
        # return binary_search(check_list[:mid], check_function)


# # 多线程二分法
# def get_issue_version1(version_list, check_function):
#     return midd_search(version_list, check_function)


# 常规二分法
def binary_search(check_list, check_function):
    if len(check_list) == 0:
        return False
    mid = len(check_list) // 2
    if check_function(check_list[mid]):
        if len(check_list[mid+1:]) == 0:
            return {'last OK': check_list[mid]}
        return binary_search(check_list[mid + 1:], check_function)
    else:
        if len(check_list[:mid]) == 0:
            return {'first error': check_list[mid]}
        return binary_search(check_list[:mid], check_function)


def get_issue_version(version_list, check_function, result_list = None):
    # 单线程
    # resit_dic = binary_search(version_list, check_function)
    # 多线程
    resit_dic = multi_process_search(version_list, check_function,result_list)
    return resit_dic
    # if 'last OK' in resit_dic:
    #     return version_list[version_list.index(resit_dic['last OK'])+1]
    # return resit_dic['first error']


def multi_process_search(check_list, check_function, return_result_list):
    # node个数
    node_num = 4
    new_check_list_len, remain = divmod(len(check_list), node_num)
    if remain > 0:
        new_check_list_len += 1
    # 根据node数拆分的list
    binary_list = [check_list[i:i + new_check_list_len] for i in range(0, len(check_list), new_check_list_len)]

    print('org_check_list %s' % str(check_list))
    print('binary_check_list %s' % str(binary_list))

    p = Pool(node_num)
    result_dic = {}
    for i in range(node_num):
        # reslt = get_issue_version(binary_list[i], check_version1) # 原始的方法,需要递归完一组list才返回
        try:
            p_result = p.apply_async(midd_search, args=(binary_list[i], check_function))
            result_dic[i] = p_result
            # print(result.get())  # 这个会hang住,导致不能异步执行
        except IndexError:
            pass

    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

    result_list = [i.get() for i in result_dic.values()]
    print(result_list)
    return_result_list.extend(result_list)


    try:
        max_true = max([i.get()[1] for i in result_dic.values() if i.get()[0]])
        max_true = check_list.index(max_true)
    except ValueError:
        # all the version failed, check the first error
        max_true = 0
    try:
        min_false = min([i.get()[1] for i in result_dic.values() if not i.get()[0]])
        min_false = check_list.index(min_false)
    except ValueError:
        # all the version pass, check the last error
        min_false = len(check_list)

    # print(max_true,min_false)
    # print(check_list[max_true],check_list[min_false])
    if max_true+1 < min_false:
        # 返回递归结果
        return multi_process_search(check_list[max_true:min_false], check_function,return_result_list)
        # version <= 1 会遗漏第一个
        # return multi_process_search(check_list[max_true+1:min_false], check_function,return_result_list)
    # else:
    #     pass
        # if 0 < max_true < min_false < len(check_list)-1:
        #     return {'last OK': check_list[max_true],
        #             'first error': check_list[min_false]}
        # # all the version failed, return the first error
        # if max_true == 0:
        #     print('first error')
        #     return {'first error': check_list[max_true]}
        # # all the version pass, return the last error
        # if min_false == len(check_list)-1:
        #     print('last OK')
        #     return {'last OK': check_list[min_false-1]}


# 版本是否ok,True ok
def check_version(version):
    time.sleep(0.1)
    print('process %s checking..' % os.getpid())
    if version <= 69:
        print('%s OK!' % version)
        return True
    else:
        print('%s not OK!' % version)
        return False


if __name__ == '__main__':

    # 多进程函数不能放在main函数里面
    # def check_version1(version):
    #     time.sleep(1)
    #     print('checking..')
    #     if version <= 75:  # last OK
    #     # if version <= 35: # first error
    #         print('found!')
    #         return True
    #     else:
    #         print('not found!')
    #         return False

    # 版本列表
    alist = [5, 10, 15, 18, 35, 55, 65, 75, 99 ,100 ,111 ,112 ,113 ,115 ,116 ,118 ,119,121,122,123,124,125]
    result_list = []
    # res = multi_process_search(alist,check_version)
    res = get_issue_version(alist,check_version,result_list)
    print(res)
    print(min(result_list))

  1. 直接按照node个数,分割数组
# 缺点
每个node会递归计算完自己的list,但是这个list如果全部failed/pass,就是多余计算

# 优点
实现简单
  1. 按照node个数,间隔划分
# 比如两个node,就按照奇数偶数划分list

看上去不错,不过貌似和方法2一样

原文地址:https://www.cnblogs.com/amize/p/15152631.html