MapReduce练习-----学生成绩相关题目

1.统计需求

1、统计每门课程的参考人数和课程平均分
2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数。
3、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分。

数据及字段说明:

computer,huangxiaoming,85,86,41,75,93,42,85
computer,xuzheng,54,52,86,91,42
computer,huangbo,85,42,96,38
english,zhaobenshan,54,52,86,91,42,85,75
english,liuyifei,85,41,75,21,85,96,14
algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85
english,zhouqi,85,86,41,75,93,42,85,75,55,47,22
english,huangbo,85,42,96,38,55,47,22
algorithm,liutao,85,75,85,99,66
computer,huangzitao,85,86,41,75,93,42,85
math,wangbaoqiang,85,86,41,75,93,42,85
computer,liujialing,85,41,75,21,85,96,14,74,86
computer,liuyifei,75,85,62,48,54,96,15
computer,liutao,85,75,85,99,66,88,75,91
computer,huanglei,76,95,86,74,68,74,48
english,liujialing,75,85,62,48,54,96,15
math,huanglei,76,95,86,74,68,74,48
math,huangjiaju,85,75,86,85,85,74,86
math,liutao,48,58,67,86,15,33,85
english,huanglei,85,75,85,99,66,88,75,91
math,xuzheng,54,52,86,91,42,85,75
math,huangxiaoming,85,75,85,99,66,88,75,91
math,liujialing,85,86,41,75,93,42,85,75
english,huangxiaoming,85,86,41,75,93,42,85
algorithm,huangdatou,48,58,67,86,15,33,85
algorithm,huangzitao,85,86,41,75,93,42,85,75

数据解释
数据字段个数不固定:
第一个是课程名称,总共四个课程,computer,math,english,algorithm,
第二个是学生姓名,后面是每次考试的分数,但是每个学生在某门课程中的考试次数不固定。

2.问题一:统计每门课程的参考人数和课程平均分

根据数据分析,可以根据课程分组求解,这里的分组我们是直接在mapper端使用课程作为输出的key进行分组的,这样每门课程所有的记录会在同一个reduce方法中进行处理,mapper端只要准备好每个学生参考某门课的次数和总成绩即可。

import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
/**
 * 统计每门课程的参考人数和课程平均分
 * computer,huangxiaoming,85,86,41,75,93,42,85
 */
public class CourseOne {
 
    public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            //对数据进行解析,分析数据可知第三个字段是学生在某门课程中的考试次数
            //根据问题可以分析,统计参考人数,只有使用课程作为key,在reduce阶段终极数据条数即可
            //对于课程的平均分要统计该门课程所有学生全部的考试次数,以及总分
            //在mapper阶段,能统计每一个学生在每个课程中的总考试次数和总分
            String[] lines = value.toString().split(",");
            //sum用来统计学生在某门课程中的考试成绩
            long sum = 0L;
            //totalTimes用来统计学生在某门课程中的考试次数
            //computer,huangxiaoming,85,86,41,75,93,42,85
            //首先数据时通过','进行分隔的,所以通过mapper逐行读取然后根据','进行切分得到一个数组
            //然后从第三个元素开始就是某位学生在某门课程中一次考试的成绩
            //所以使用数组长度减去2就是该学生在该课程中的总考试次数
            long totalTimes = lines.length-2;
            //通过循环遍历累加该学生在该课程中的考试成绩
            for (int i = 2; i < lines.length; i++) {
                sum += Long.parseLong(lines[i]);
            }
            //最后的输出,使用课程名称作为key 例如:computer
            //使用拼接字符串的形式创建value,方便reducer阶段的处理
            //使用totalTimes+"_"+sum 这种拼接方式,
            //考试次数  + 总成绩
            context.write(new Text(lines[0]), new Text(totalTimes+"_"+sum));
        }
    }
    
    public static class MyReducer extends Reducer<Text,Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values,Context context)
                throws IOException, InterruptedException {
            //相同的课程会被分到一个组
            //考试人数计数器
            int count = 0;
            //得分累加器
            int totalScore = 0;
            //考试次数计数器
            int examTimes = 0;
            for (Text t : values) {
                String[] arrs = t.toString().split("_");
                count++;
                totalScore += Integer.parseInt(arrs[1]);
                examTimes += Integer.parseInt(arrs[0]);
            }
            
            //求平均分
            float avg = totalScore*1.0F/examTimes;
            //输出结果
            context.write(key, new Text(count+"	"+avg));
        
        }
    }
    
    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(CourseOne.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        FileInputFormat.setInputPaths(job, new Path("G:/files/mr/day2/q3/input"));
        FileOutputFormat.setOutputPath(job,new Path("G:/files/mr/day2/q3/output1") );
        
        boolean isDone = job.waitForCompletion(true);
        System.exit(isDone ? 0:1);
    }
}

3.问题二:统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数。

自定义数据类型:CourseBean

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
 
public class CourseBean implements WritableComparable<CourseBean>{
    private String course; //课程名
    private String name; //学生姓名
    private float avg; //平均分
    
    public String getCourse() {
        return course;
    }
    public void setCourse(String course) {
        this.course = course;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public float getAvg() {
        return avg;
    }
    public void setAvg(float avg) {
        this.avg = avg;
    }
    
    public CourseBean(String course, String name, float avg) {
        super();
        this.course = course;
        this.name = name;
        this.avg = avg;
    }
    
    public CourseBean() {
 
    }
    
    
    /**
     * 通过toString方法自定义输出类型
     */
    @Override
    public String toString() {
        return course + "	" + name + "	" + avg;
    }
    
    /**
     * 序列化
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(course);
        out.writeUTF(name);
        out.writeFloat(avg);
    }
    
    /**
     * 反序列化
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        course = in.readUTF();
        name = in.readUTF();
        avg = in.readFloat();
    }
    
    //比较规则
    @Override
    public int compareTo(CourseBean o) {
        float flag = o.avg - this.avg;
        return flag > 0.0f ? 1:-1;
    }

自定义分区组件:CourseGroupComparator

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
 
public class CoursePartitioner extends Partitioner<CourseBean, NullWritable>{
 
    /*algorithm    6    71.12195
    computer    10    69.77273
    english    9    66.35294
    math    7    73.07843*/
    @Override
    public int getPartition(CourseBean key, NullWritable value, int numPartitions) {
        if("algorithm".equals(key.getCourse())){
            return 0;
        }else if("computer".equals(key.getCourse())){
            return 1;
        }else if("english".equals(key.getCourse())){
            return 2;
        }else{
            return 3;
        }
    }
 
}

mapreduce程序:

import java.io.IOException;
import java.text.DecimalFormat;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
/**
 * 统计每门课程的参考人数和课程平均分
 * 考虑到要需求要根据课程进行分组并对平均值进行排序,这里使用自定义bean的形式来进行处理
 * 因为要将数据根据课程进行分区并写入到不容的文件中,所以这里使用自定partitioner组件进行分区
 * 要注意的是这时候就要设置reduceTask的个数
 * 
 */
public class CourseTwo {
    static Text text = new Text();
 
    public static class MyMapper extends Mapper<LongWritable, Text, CourseBean, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            String[] lines = value.toString().split(",");
            long sum = 0L;
            long totalTimes = lines.length-2;
            for (int i = 2; i < lines.length; i++) {
                sum += Long.parseLong(lines[i]);
            }
            //格式化平均分使用,保留一位有效小数
            DecimalFormat df=new DecimalFormat(".0");
            //计算某个学生在某门课程中的平均分
            float avg = sum*1.0f/totalTimes;
            String b = df.format(avg);
            //构建mapper输出的key
            CourseBean cb = new CourseBean(lines[0],lines[1],Float.parseFloat(b));
            
            context.write(cb, NullWritable.get());
        }
    }
    
    public static class MyReducer extends Reducer<CourseBean, NullWritable,CourseBean, NullWritable>{
        @Override
        protected void reduce(CourseBean key, Iterable<NullWritable> values,Context context)
                throws IOException, InterruptedException {
            //因为自定义了分区组件,自定义类型有排序规则,所以这里直接输出就可以了
            for (NullWritable nullWritable : values) {
                context.write(key, nullWritable);
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(CourseTwo.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        
        job.setOutputKeyClass(CourseBean.class);
        job.setOutputValueClass(NullWritable.class);
        
        //使用自定义的分区组件
        job.setPartitionerClass(CoursePartitioner.class);
        //和自定义分区组件同时使用,根据分区的个数设置reduceTask的个数
        job.setNumReduceTasks(4);
        
        FileInputFormat.setInputPaths(job, new Path("G:/files/mr/day2/q3/input"));
        FileOutputFormat.setOutputPath(job,new Path("G:/files/mr/day2/q3/output2") );
        
        boolean isDone = job.waitForCompletion(true);
        System.exit(isDone ? 0:1);
    }
}

4.问题三:求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分

这里可以看组是一个分组取Top1的问题,转换到这个题目,因为每个学生在某门课程中都参考了多次,所以这里在mapper端要先求出每个学生在某门课程的最高分,将最高分及相关信息输出,在reducer阶段求出每门课程的最大值,由于题目要求输出的是课程,姓名,平均分;那么就要在mapper端将每个学生各科的平均分求出。

通过对于问题的分析,这里采用自定义输出类型的方式来处理,这里使用bean类型,首先要考虑的是学生在某门课程中的最高分,这里要进行分组求max,默认的使用自定义组件中的compareTo( )方法中的字段进行,这样多个字段进行分组造成我们在reduce阶段取值的时候使用循环的次数增加。所以我们自定义分组组件。使用课程进行分组。

自定义数据类型:CourseBean2

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
 
import org.apache.hadoop.io.WritableComparable;
 
public class CourseBean2 implements WritableComparable<CourseBean2>{
    private String course;
    private String name;
    private float avg;
    private long maxScore;
    
    
    public long getMaxScore() {
        return maxScore;
    }
    public void setMaxScore(long maxScore) {
        this.maxScore = maxScore;
    }
    public String getCourse() {
        return course;
    }
    public void setCourse(String course) {
        this.course = course;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public float getAvg() {
        return avg;
    }
    public void setAvg(float avg) {
        this.avg = avg;
    }
    
    public CourseBean2(String course, String name, float avg, long maxScore) {
        super();
        this.course = course;
        this.name = name;
        this.avg = avg;
        this.maxScore = maxScore;
    }
    
    public CourseBean2() {
 
    }
    
    
    
    
    @Override
    public String toString() {
        return course+"	"+name + "	" + avg +"	"+maxScore;
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(course);
        out.writeUTF(name);
        out.writeFloat(avg);
        out.writeLong(maxScore);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        course = in.readUTF();
        name = in.readUTF();
        avg = in.readFloat();
        maxScore = in.readLong();
    }
    @Override
    public int compareTo(CourseBean2 o) {
      /*首先通过课程进行排序,课程相同的通过成绩进行排序
        值得一提的是,使用自定义分组组件指定的分组字段,一定要在comparaTo方法中使用字段得而前面  
        eg: a  
            a b  
            a b c  
            a b c d  
            a b c d e  */
        int index = o.course.compareTo(this.course);
        if(index == 0){
            long flag = o.maxScore - this.maxScore;
            return flag > 0L ? 1:-1;
            
        }else{
            return index > 0L ? 1:-1;
        }
    }
    
}

自定义分组组件:CourseGroupComparator

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
 
/**
 * 自定义分组组件
 * 1、如果没有定义自定义的分组组件,默认的使用comparaTo方法中的字段进行分组排序
 * 这里要继承WritableComparator类,来进行序列化和比较
 */
public class CourseGroupComparator  extends WritableComparator{
    
    /**
     * 为了解决下面出现空指针的现象,所以在类中声明一个构造函数来进行创建
     */
    public CourseGroupComparator() {
        super(CourseBean2.class,true);
    }
    
    
    /**
     * 如果直接这样使用会出现一个空指针的错误,主要是a,b没有进行构造,所以是空的;
     * 创建一个构造方法就可以进行解决
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        CourseBean2 cb1 = (CourseBean2) a;
        CourseBean2 cb2 = (CourseBean2) b;
        //这里是根据课程名称进行处理的
        return cb1.getCourse().compareTo(cb2.getCourse());
    }
}

mapreduce程序:

package com.jh.hive;

import java.io.IOException;
import java.text.DecimalFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 统计每门课程的参考人数和课程平均分
 * 考虑到要需求要根据课程进行分组并对平均值进行排序,这里使用自定义bean的形式来进行处理
 * 因为要将数据根据课程进行分区并写入到不容的文件中,所以这里使用自定partitioner组件进行分区
 * 要注意的是这时候就要设置reduceTask的个数
 * 
 */
public class CountApp3 {
    static class CountMapper extends Mapper<LongWritable, Text, CourseBean2, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, CourseBean2, NullWritable>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] worlds = line.split(",");
            long sum = 0l;
            long ksCount = worlds.length-2;
            long maxScore = Long.parseLong(worlds[2]);
            String course = worlds[0];
            String name = worlds[1];
            for (int i = 2; i < worlds.length; i++) {
                sum += Float.parseFloat(worlds[i]);
                if (Long.parseLong(worlds[i])>maxScore) {
                    maxScore = Long.parseLong(worlds[i]);
                }
            }
            //格式化平均分使用保留一位有效小数
            DecimalFormat df = new DecimalFormat(".0");
            float avg = sum*1.0f/ksCount;
            String format = df.format(avg);
            CourseBean2 courseBean = new CourseBean2(course,name,Float.parseFloat(format),maxScore);
            context.write(courseBean, NullWritable.get());
        }
    }
    static class CountReducer extends Reducer<CourseBean2, NullWritable, CourseBean2, NullWritable>{
        @Override
        protected void reduce(CourseBean2 arg0, Iterable<NullWritable> arg1,
                Reducer<CourseBean2, NullWritable, CourseBean2, NullWritable>.Context context)
                throws IOException, InterruptedException {
            int count = 0;
            for (NullWritable nullWritable : arg1) {
                count++;
                if (count==1) {
                    context.write(arg0, nullWritable);
                }
            }
        }
            
    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(conf);
        if (fileSystem.exists(new Path(args[1]))) {
            fileSystem.delete(new Path(args[1]), true);
        }
        Job job = Job.getInstance(conf);
        job.setJarByClass(CountApp3.class);
        job.setMapperClass(CountMapper.class);
        job.setMapOutputKeyClass(CourseBean2.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setReducerClass(CountReducer.class);
        job.setOutputKeyClass(CourseBean2.class);
        job.setOutputValueClass(NullWritable.class);

        job.setGroupingComparatorClass(CourseGroupComparator.class);//添加分组组件

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?1:0);
    }
}    

转载博客:https://blog.csdn.net/zyz_home/article/details/79937228

原文地址:https://www.cnblogs.com/LEPENGYANG/p/15331578.html