第一个MapReduce的例子
Hadoop Guide的第一个MapReduce的例子是处理气象数据的(数据来源ncdc),终于跑通了。总结一下步骤,安装hadoop不在本文中介绍
1 数据预处理
1.1 下载数据
测试数据需要在ncdc的官方ftp上进行下载,年份跨度范围1901到2016,不写个脚本下载,靠手工是行不通的,脚本如下:
download.sh
!bin/bash
for i in {1901..2015}
do
wget --execute robots=off -r -np -nH -P./ncdc/ --cut-dirs=4 -R index.html* ftp://ftp.ncdc.noaa.gov/pub/data/gsod/$i/gsod_$i.tar
done
执行命令[nohup sh download.sh &] 会把所有ftp上gsod_*.tar的压缩包下载下来,下载完之后需要预处理这些数据
1.2 数据预处理
每一个tar压缩包包含n个gz压缩包,每一个gz包含一个数据文本,关于ncdc气象数据的每个字段的描述在这里,格式举例如下
STN--- WBAN YEARMODA TEMP DEWP SLP STP VISIB WDSP MXSPD GUST MAX MIN PRCP SNDP FRSHTT
607450 99999 20100101 56.1 22 33.0 22 1012.4 8 975.5 8 5.6 22 9.1 22 19.0 999.9 63.9 48.2* 0.00G 999.9 000000
607450 99999 20100102 53.0 23 34.2 23 1019.5 8 982.2 8 5.8 23 6.7 23 12.0 999.9 66.7 39.0 0.00G 999.9 000000
607450 99999 20100103 50.5 23 34.3 23 1022.2 8 984.6 8 6.2 23 7.7 23 12.0 999.9 64.9 36.5 0.00G 999.9 000000
607450 99999 20100104 53.0 22 34.5 22 1016.5 8 979.3 8 6.4 22 6.5 22 15.9 999.9 64.9 42.8* 0.00G 999.9 000000
预处理的目标是把压缩包里面的数据按照一个年份一个txt的形式存在,为了Map阶段读书去数据方便,去除第一行的title。处理脚本如下,processh.sh
#!/bin/bash
for i in {1901..2017}
do
tar xf ./ncdc/gsod_$i.tar -C ./ncdc
gunzip ./ncdc/*.gz
rm -rf ncdc/input_gsod_$i.txt
touch ncdc/input_gsod_$i.txt
for file in ./ncdc/*.op
do
sed -i '1d' $file
cat $file >> ./ncdc/input_gsod_$i.txt
done
rm -rf ./ncdc/*.op
echo "file gsod_$i has processed "
done
1.3 Load数据到HDFS上
创建input目录:
hdfs dfs -mkdir /ncdc
put数据到hdfs上:
hdfs dfs -put ./ncdc/*.txt /ncdc/*
检查hdfs上的数据, 如果所有的年份的数据都load到了hdfs上就OK
hdfs dfs -ls /ncdc
2 MapReduce程序
2.1 MapReduce程序
package com.oldtrafford.hadoop;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MaxTemperature {
public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if(args.length!=2){
System.err.println("usage: maxtemperature <input>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("MaxTemperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
System.out.println("Finished");
}
static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final String MISSING = "9999.9";
@Override
public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(14, 18);
String tempretureStr = line.substring(24,30).trim();
int temperature = -1;
if(!MISSING.equals(tempretureStr)){
temperature = (int)(Double.parseDouble(tempretureStr)*10);
context.write(new Text(year), new IntWritable(temperature));
}
}
}
static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for(IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
}
2.2 打包代码&&执行
打包代码
打包mapreduce的代码用最简单的maven命令,会产生一个jar包。然后把这个jar传输到hadoop机器的上一台机器上。
执行hadoop
hadoop jar temperature.jar com.oldtrafford.hadoop.MaxTemperature /ncdc/* /ncdc_output
3 查看执行结果
查看mapreduce结果
hdfs dfs -cat /ncdc_output/*