《Hadoop权威指南》笔记 第二章 Hadoop Streaming

什么是Hadoop Streaming

   

Hadoop提供的一个编程工具,允许用户使用任何可执行文件或脚本作为mapperReducer

   

一个例子(shell简洁版本)

   

$HADOOP_HOME/bin/hadoop jar

$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar

-input myInputDirs

-output myOutputDir

-mapper cat

-reducer wc

   

解析:

   

首先找到Hadoop Streaming所在的包

然后定义输入输出路径

然后定义mapperreducer

   

这里是用shell中的cat作为mapperwc作为reducer

   

同一个例子(shell详细版),每行可能有多个单词

   

mapper.sh

   

#! /bin/bash

while read LINE; do

for word in $LINE

do

echo "$word 1"

done

done

   

reducer.sh

   

#! /bin/bash

count=0

started=0

word=""

while read LINE;do

newword=`echo $LINE | cut -d ' ' -f 1`

if [ "$word" != "$newword" ];then

[ $started -ne 0 ] && echo "$word $count"

word=$newword

count=1

started=1

else

count=$(( $count + 1 ))

fi

done

echo "$word $count"

   

同一个例子(python)

   

map

   

#!/usr/bin/env python

 

import sys

 

# maps words to their counts

word2count = {}

 

# input comes from STDIN (standard input)

for line in sys.stdin:

# remove leading and trailing whitespace

line = line.strip()

# split the line into words while removing any empty strings

words = filter(lambda word: word, line.split())

# increase counters

for word in words:

# write the results to STDOUT (standard output);

# what we output here will be the input for the

# Reduce step, i.e. the input for reducer.py

#

# tab-delimited; the trivial word count is 1

print '%s %s' % (word, 1)

   

reducer

   

#!/usr/bin/env python

 

from operator import itemgetter

import sys

 

# maps words to their counts

word2count = {}

 

# input comes from STDIN

for line in sys.stdin:

# remove leading and trailing whitespace

line = line.strip()

 

# parse the input we got from mapper.py

word, count = line.split()

# convert count (currently a string) to int

try:

count = int(count)

word2count[word] = word2count.get(word, 0) + count

except ValueError:

# count was not a number, so silently

# ignore/discard this line

pass

 

# sort the words lexigraphically;

#

# this step is NOT required, we just do it so that our

# final output will look more like the official Hadoop

# word count examples

sorted_word2count = sorted(word2count.items(), key=itemgetter(0))

 

# write the results to STDOUT (standard output)

for word, count in sorted_word2count:

print '%s %s'% (word, count)

   

测试:

   

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar

-input myInputDirs

-output myOutputDir

-mapper Mapper.py

-reducer Reducerr.py

-file Mapper.py

-file Reducer.py

   

或者本地测试

   

cat input.txt | python Mapper.py | sort | python Reducer.py

   

书中的例子(ruby版本)

   

Ruby版本

   

map函数:

   

#! /usr/bin/env ruby

STDIN.each_line do |line|

var = line

year, temp, q = val[15,4], val[87,5], val[92,1]

puts "#{year} #{temp}" if (temp != "+9999" && q =~ /[01459]/)

end

   

   

   

reduce函数:

   

#! /usr/bin/env ruby

   

last_key, max_val = nil, 0

STDIN.each_line do |line|

key, val = line.split(" ")

if last_key && last_key != key

puts "#{last_key} #{max_val}"

last_key, max_val = key, val.to_i

else

last_key, max_val = key, [max_val, val.to_i].max

end

end

puts "#{last_key} #{max_val}" if last_key

   

reduce的输入是map的输出, 并且已经被Hadoop按照key排过序了.

   

所以, 如果 last_key 不为空, 可以代码还没有到最后; last_key!=key, 就是说这个key是一个新key. 打印之前key一条记录, 因为之前的key所有值都处理完了.

   

并且更新last_key, 最大值就是val.

   

如果是重复的key, 就和存储的最大值进行比较.

   

最后再打印最后一个K/V pair.

   

   

测试

   

   

书中的例子(python版本)

   

   

Hadoop Streaming编程原理

   

mapperreducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出,Streaming工具会创建MapReduce job,发送给各个taskTracker,同时监控整个job的执行过程

   

如果一个文件(可执行文件或者脚本)作为mappermapper初始化时,每一个mapper任务会把文件作为一个单独的进程启动

   

mapper任务运行时,把输入切分成行,然后把每一行提供给可执行文件进程的标准输入。同时mapper收集可执行文件进程的标准输出内容,并把收到的每一行内容转化为key/value对作为mapper的输出。

   

默认情况下,一行的第一个tab之前的作为key,后面的作为value

   

如果没有tab,整行作为keyvalue为空

   

用法

   

Hadoop jar + Hadoop Streaming jar + option

   

option有:

   

-input

-output

-mapper

-reducer

-file:打包文件到提交的作业中,可以使mapper或者reducer要用的输入文件,如配置文件,字典等

-partitioner

-combiner

-D:作业的一些属性,以前用的是-jobconf

   

mapred.map.tasksmap task的数目

mapred.reduce.tasks

stream.map.input.field.separator/stream.map.output.field.separatormap输入输出的分隔符,默认为

   

本地测试:

   

cat input.txt|python Mapper.py|sort|python Reducer.py

   

或者

   

cat input.txt|./Mapper|sort|./Reducer

   

摘自

   

http://dongxicheng.org/mapreduce/hadoop-streaming-programming/

原文地址:https://www.cnblogs.com/keedor/p/4393717.html