大数据mapreduce二分法ip定位之Python实现

ip定位数据大约12M,采用-chacheFile 分发

文件来源https://pan.baidu.com/s/1J0pwTafHgt4T0k3vV_gC-A

格式大致格式如下:

0.0.0.0 0.255.255.255 NULL IANA保留地址 NULL
1.0.0.0 1.0.0.255 亚洲 亚太地区 NULL
1.0.1.0 1.0.1.255 亚洲 中国 福建
1.0.2.0 1.0.3.255 亚洲 中国 福建
1.0.4.0 1.0.7.255 大洋洲 澳大利亚 NULL
1.0.8.0 1.0.15.255 亚洲 中国 广东
1.0.16.0 1.0.31.255 亚洲 日本 NULL
1.0.32.0 1.0.63.255 亚洲 中国 广东
1.0.64.0 1.0.127.255 亚洲 日本 NULL
1.0.128.0 1.0.255.255 亚洲 泰国 NULL

用户数据是应该是大量的,这个用于数据输入:

文件来源:https://pan.baidu.com/s/1l6qqr9U2YObl_pyM4r3VjA

数据大致格式如下:

ECEE8FBBBB      113.224.76.226
ED38780B1D      106.36.217.145
120BB4FB44      113.109.42.83
9D4EC87B4B      219.153.212.31
AF0E43C785      111.77.229.40
4AAAEB560B      60.13.190.132
53BAABADD8      124.167.254.130
6C1FADFF90      60.27.188.251
478A215D8C      101.47.19.207
0000000000      106.114.74.120
A7FDC270DF      112.26.70.229
1714BE65F3      36.63.12.44
0EFF6D7DCC      218.75.123.226
0000000000      211.161.248.231

map.py思路,每个计算节点都将ip定位数据加载到内存中,然后输入部分用户数据,进行分析,代码如下:

#!/usr/bin/python
ip_convert = lambda x:sum([256**j*int(i) for j,i in enumerate(x.split('.')[::-1])])#将ip转换为数字
def load_ip_lib_func(ip_lib_fd):#将ip定位信息加载到内存
    ip_lib_list = []
    file_in = open(ip_lib_fd, 'r')
    for line in file_in:
        ss = line.strip().split(' ')
        if len(ss) != 5:
            continue
        start_ip = ss[0].strip()
        end_ip = ss[1].strip()
        area = ss[2].strip()
        country = ss[3].strip()
        province = ss[4].strip()
        ip_lib_list.append((ip_convert(start_ip), ip_convert(end_ip), area, country, province))
    return ip_lib_list

def get_addr(ip_lib_list, ip_str):#二分查找获取地址信息
    ip_num = ip_convert(ip_str)
    low_index = 0
    mid_index = 0
    high_index = len(ip_lib_list) - 1
    while (low_index < high_index):
        mid_index = (low_index + high_index) / 2
        sss = ip_lib_list[mid_index]
        start_ip = sss[0]
        end_ip = sss[1]
        provice = sss[4].strip()
        if ip_num < start_ip:
            high_index = mid_index - 1
        elif ip_num > start_ip:
            low_index = mid_index + 1
    if ip_num < start_ip:
        provice = ip_lib_list[mid_index-1][4]
    else:
        provice = ip_lib_list[mid_index][4]
    return provice
def mapper_func(ip_lib_fd):
    ip_lib_list = load_ip_lib_func(ip_lib_fd)

    for line in sys.stdin:
        ss = line.strip().split('	')
        if len(ss) != 2:
            continue

        cookie = ss[0].strip()
        ip_str = ss[1].strip()

        user_addr = get_addr(ip_lib_list, ip_str)
        print '	'.join([cookie, ip_str, user_addr])



if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)

run.sh如下:

HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"

INPUT_FILE_PATH_1="/mapreduce/ip/cookie_ip.txt"
OUTPUT_PATH="/mapreduce/ip/out"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH 
    -input $INPUT_FILE_PATH_1 
    -output $OUTPUT_PATH 
    -mapper "python map.py mapper_func ABC" 
    -reducer "cat" 
    -jobconf "mapred.reduce.tasks=5" 
    -jobconf "mapred.map.tasks=5" 
    -jobconf "mapreduce.reduce.memory.mb=5000" 
    -jobconf  "mapred.job.name=ip_lib_demo" 
    -cacheFile "hdfs://master:9000/mapreduce/ip/ip.lib.txt#ABC" 
    -file "./map.py"

原文地址:https://www.cnblogs.com/students/p/8858320.html