hadoop配置2.6.1 centos7

上传文件(分发)的三种方式:

1.本地:

-file 的模式,上传一些小的文件。

例如:

-file ./test

INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output_file_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH 
    -input $INPUT_FILE_PATH_1 
    -output $OUTPUT_PATH 
    -mapper "python map.py mapper_func white_list" 
    -reducer "python red.py reduer_func" 
    -jobconf "mapred.reduce.tasks=3" 
    -file ./map.py 
    -file ./red.py 
    -file ./white_list      
run.sh

2.-cacheFile  ,向计算节点分发hdfs文件。(文件需要先上传到HDFS中)

例如:

-cacheFile "hdfs://master:9000/white_list#ABC"

3.-cacheArchive,向计算节点分发hdfs文件。(文件需要先上传到HDFS中)

例如:

-cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz"  
这种情况是streaming结构会自动给你解压文件,不用你去考虑。只需要改相应的文件路径就好了。
def get_file_handler(f):
    file_in = open(f, 'r')
    return file_in

def get_cachefile_handlers(f):
    f_handlers_list = []
    if os.path.isdir(f):
        for fd in os.listdir(f):
            f_handlers_list.append(get_file_handler(f + '/' + fd))
    return f_handlers_list


def read_local_file_func(f):
    word_set = set()
    for cachefile in get_cachefile_handlers(f):
        for line in cachefile:
            word = line.strip()
            word_set.add(word)
    return word_set


def mapper_func(white_list_fd):
    word_set = read_local_file_func(white_list_fd)

    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
            word = s.strip()
            #if word != "" and (word in word_set):
            if word != "":
                print "%s	%s" % (s, 1)



if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)
map.py

#!/usr/bin/python

import sys

def reduer_func():
    current_word = None
    count_pool = []
    sum = 0

    for line in sys.stdin:
        word, val = line.strip().split('	')

        if current_word == None:
            current_word = word

        if current_word != word:
            for count in count_pool:
                sum += count
            print "%s	%s" % (current_word, sum)
            current_word = word
            count_pool = []
            sum = 0

        count_pool.append(int(val))

    for count in count_pool:
        sum += count
    print "%s	%s" % (current_word, str(sum))



if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    func(*args)

red.py

HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output_cachearchive_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH 
    -input $INPUT_FILE_PATH_1 
    -output $OUTPUT_PATH 
    -mapper "python map.py mapper_func WH.gz" 
    -reducer "python red.py reduer_func" 
    -jobconf "mapred.reduce.tasks=10" 
    -jobconf  "mapred.job.name=cachefile_demo" 
    -jobconf  "mapred.compress.map.output=true" 
    -jobconf  "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" 
    -jobconf  "mapred.output.compress=true" 
    -jobconf  "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" 
    -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" 
    -file "./map.py" 
    -file "./red.
red.py
HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
#!/user/bin/env python
#上面这个是让系统自己寻找python可执行文件

#输入文件,多个文件可以使用,分隔,前提文件需要先上传到hdfs上。
INPUT_FILE_PATH_1="/1.txt,/2.txt"  

#hdfs上的输出文件目录的位置
OUTPUT_PATH="/table1"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH 
    -input $INPUT_FILE_PATH_1 
    -output $OUTPUT_PATH 
    -mapper "python map.py " 
    -reducer "python red.py " 
    -file ./map.py 
    -file ./red.py 
    -jobconf mapred.reduce.tasks=2     #设置reduce的数量
    
    #下面两行:是开启map阶段产生的数据是否压缩,第二行是压缩的格式
    -jobconf  "mapred.compress.map.output=true"             ###1
    -jobconf  "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec"    ###1
    
    #下面两行是:最终输出的是否开启压缩,及其压缩的格式
    -jobconf  "mapred.output.compress=true"         ###2
    -jobconf  "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec"    ###2
  
    #下面是压缩文件上传的位置 “#”后面是别名,在配置文件中可以使用,slave节点#在运行过程中也是使用别名来建目录的。   
     -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz"                         ###第三种传文件的方式。
    
    #下面第一行是表示以什么分隔,默认是制表符“	”
    #第二行是以分隔后的前两个作为key,剩下为value
    #第三行是在key中以,分隔,
    #第四行是在第三行分隔后,用第一列分桶
    -jobconf stream.map.output.field.separator=','  / 
    -jobconf stream.num.map.output.key.fields=2     -jobconf map.output.key.field.separator=',' / 
    -jobconf num.key.fields.for.partition=1 
    #下面是在你自己设置partition时写入的东西。
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
run.sh

-jobconf mapred.text.key.partitioner.options=-k2,3   相当于-jobconf num.key.fields.for.partition=1
的扩展,意思是在key中,选择2,3列作为partition
在没有设partion的时候,默认等于
先分桶,之后再在桶中按照key排序,

补充:!!!

可以通过压缩文件的方式,控制map的数量,一个压缩文件对应一个map

还可以不影响路径,即可以让目录结构保持不变.



-----------------------------------------
def get_file_handler(f):
    file_in = open(f, 'r')
    return file_in

def get_cachefile_handlers(f):
    f_handlers_list = []
    if os.path.isdir(f):
        for fd in os.listdir(f):
            f_handlers_list.append(get_file_handler(f + '/' + fd))
    return f_handlers_list


def read_local_file_func(f):
    word_set = set()
    for cachefile in get_cachefile_handlers(f):
        for line in cachefile:
            word = line.strip()
            word_set.add(word)
    return word_set


def mapper_func(white_list_fd):
    word_set = read_local_file_func(white_list_fd)

    for line in sys.stdin:
        ss = line.strip().split(' ')
        for s in ss:
            word = s.strip()
            #if word != "" and (word in word_set):
            if word != "":
                print "%s	%s" % (s, 1)



if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    f
#!/usr/bin/python

import sys

def reduer_func():
    current_word = None
    count_pool = []
    sum = 0

    for line in sys.stdin:
        word, val = line.strip().split('	')

        if current_word == None:
            current_word = word

        if current_word != word:
            for count in count_pool:
                sum += count
            print "%s	%s" % (current_word, sum)
            current_word = word
            count_pool = []
            sum = 0

        count_pool.append(int(val))

    for count in count_pool:
        sum += count
    print "%s	%s" % (current_word, str(sum))



if __name__ == "__main__":
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])
    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]
    f
HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output_cachearchive_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH 
    -input $INPUT_FILE_PATH_1 
    -output $OUTPUT_PATH 
    -mapper "python map.py mapper_func WH.gz" 
    -reducer "python red.py reduer_func" 
    -jobconf "mapred.reduce.tasks=10" 
    -jobconf  "mapred.job.name=cachefile_demo" 
    -jobconf  "mapred.compress.map.output=true" 
    -jobconf  "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" 
    -jobconf  "mapred.output.compress=true" 
    -jobconf  "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" 
    -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" 
    -file "./map.py" 
    -file "./red.
原文地址:https://www.cnblogs.com/taozizainali/p/8811893.html