hadoop程序MapReduce之MaxTemperature

需求:求每年当中最高的温度

样本:temp.log 

        2016080623

        2016072330

        2015030420

输出结果:2016 30

              2015 20

MapReduce分析设计:

Mapper分析设计:

1、将文件分割成键值队<k1,v1>,k1代表:行位置,v1代表:一行数据。

2、将这行数据进行分割成<k2,v2>,k2代表:年份,v1代表:温度。

Reduce分析设计:

3、将一些列合并后的相同key的一系列温度<k3,v3>,k3代表:年份,v1代表:list<int>多个温度。

4、统计比较最大温度<k4,v4>,k4代表:年份,v4代表:最大的温度。

程序部分:

TempMapper类:

package com.cn.temperature;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TempMapper extends Mapper<Object, Text, Text, IntWritable>{
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String lineValue = value.toString();
        String year = lineValue.substring(0, 4);
        int temperature = Integer.parseInt(lineValue.substring(8));
        context.write(new Text(year), new IntWritable(temperature));
    }
}

TempReduce部分:

package com.cn.temperature;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TempReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int maxTemp = Integer.MIN_VALUE;
        for(IntWritable value : values){
            maxTemp = Math.max(maxTemp, value.get());
        }
        context.write(key, new IntWritable(maxTemp));
    }
}

MaxTemperature部分:

package com.cn.temperature;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;

public class MaxTemperature {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
           System.err.println("Usage: wordcount  ");
           System.exit(2);
        }
        Job job = new Job(conf, "max tempperature");
        
        //运行的jar
        job.setJarByClass(MaxTemperature.class);
        
        //job执行作业时输入和输出文件的路径
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  
        //指定自定义的Mapper和Reducer作为两个阶段的任务处理类
        job.setMapperClass(TempMapper.class);
        job.setReducerClass(TempReduce.class);
          
        //设置最后输出结果的Key和Value的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //提交作业并等待它完成
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

记录自己成长的过程。我觉得这点很重要。

原文地址:https://www.cnblogs.com/xubiao/p/5743965.html