MapReduce--平均分,最高,低分以及及格率的计算

                    MapReduce--平均分,最高,低分以及及格率的计算

计算班级的平均分,以及个人的最高最低分,以及每个班级的及格率。

来先看一下我的数据。

时间			班级		姓名		科目			成绩
20180501 		1708a1 		li 		bishi 			80
20180501 		1708a1 		li 		jishi 			55
20180501 		1708a1 		li 		project 		90
20180501 		1708a1 		li2		bishi 			80
20180501 		1708a1 		li2		jishi 			20
20180501 		1708a1 		li2		project 		90
20180501 		1708a1 		li3		bishi 			50
20180501 		1708a1 		li3		jishi 			70
20180501 		1708a1 		li3		project 		60
20180501 		1708a1 		zhangsan 	bishi 			88
20180501 		1708a1 		zhangsan 	jishi 			55
20180501 		1708a1 		zhangsan 	project 		98
20180501 		1708a1 		lishi 		bishi 			18
20180501 		1708a1 		lishi 		jishi 			15
20180501 		1708a1 		lishi 		project 		15
20180501 		1708a1 		wangwu		bishi 			88
20180501 		1708a1 		wangwu		jishi 			76
20180501 		1708a1 		wangwu		project 		70
20180501 		1708a2 		li1 		bishi 			80
20180501 		1708a2 		li1 		jishi 			71
20180501 		1708a2 		li1 		project 		96
20180501 		1708a2 		li2 		bishi 			80
20180501 		1708a2 		li2 		jishi 			26
20180501 		1708a2 		li2 		project 		90
20180501 		1708a2 		li3 		bishi 			80
20180501 		1708a2 		li3 		jishi 			55
20180501 		1708a2 		li3 		project 		90
20180501 		1708a2 		zhangliang 	bishi 			81
20180501 		1708a2 		zhangliang 	jishi 			55
20180501 		1708a2 		zhangliang 	project 		98
20180501 		1708a2 		liuli 		bishi	 		70
20180501 		1708a2 		liuli 		jishi 			95
20180501 		1708a2 		liuli 		project 		75
20180501 		1708a2 		wangwu 		bishi 			80
20180501 		1708a2 		wangwu 		jishi 			76
20180501 		1708a2 		wangwu 		project 		70
20180501 		1708a2 		zhangxi 	bishi 			18
20180501 		1708a2 		zhangxi 	jishi 			16
20180501 		1708a2 		zhangxi 	project 		10

数据之间是空格。。。。

代码来了 -- 平均分,最高分,最低分

package com.huhu.day01;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 切割文本: 平均分,最高低分
 * 
 * @author huhu_k
 *
 */
public class HomeWork2 {

	// map
	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
		Text keys = new Text();
		Text values = new Text();

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			// 数据切割方式(文本中的内容)
			// 按行分
			String[] line = value.toString().split(" ");
			keys.set(line[0] + ":" + line[2]);
			values.set(line[3] + ":" + line[4]);
			context.write(keys, values);
		}
	}

	// reduce
	public static class MyReducer extends Reducer<Text, Text, Text, Text> {

		@Override
		protected void reduce(Text key, Iterable<Text> value, Context context)
				throws IOException, InterruptedException {
			int max = Integer.MIN_VALUE;
			int min = Integer.MAX_VALUE;
			// 和
			int sum = 0;
			// 人数
			int count = 0;
			// 分数
			int score = 0;
			String classs = "";
			for (Text t : value) {
				classs = t.toString().split(":")[0];
				score = Integer.parseInt(t.toString().split(":")[1]);
				if (max < score)
					max = score;
				if (min > score)
					min = score;
				switch (classs) {
				case "bishi":
					score += score * 0.4;
					break;
				case "jishi":
					score += score * 0.3;
					break;
				case "project":
					score += score * 0.3;
					break;
				}
				sum += score;
				count++;
			}
			int avg = (int) sum / count;
			String[] student = key.toString().split(":");
			Text ky = new Text(student[0] + "	" + student[1]);
			context.write(ky, new Text("平均分   " + avg));
			context.write(ky, new Text("最高值为   " + max));
			context.write(ky, new Text("最低值  " + min));
		}

	}

	public static void main(String[] args) throws Exception {

		// 配置容器
		Configuration conf = new Configuration();
		// 创建一个job
		@SuppressWarnings("deprecation")
		Job job = new Job(conf, "MyMapReduce Two");
		// 配置job
		job.setJarByClass(HomeWork2.class);
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		// 输入输出
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 执行程序
		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion ? 0 : 1);

	}

}

运行结果:




2.及格率

package com.huhu.day01;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 切割文本:及格率
 * 
 * @author huhu_k
 *
 */
public class HomeWork3 {

	// map
	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
		Text keys = new Text();
		Text values = new Text();

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			// 数据切割方式(文本中的内容)
			// 按行分
			String[] line = value.toString().split(" ");
			keys.set(line[0] + ":" + line[1]);
			context.write(keys, value);
		}
	}

	// reduce
	public static class MyReducer extends Reducer<Text, Text, Text, Text> {
		Map<String, Double> map = new HashMap<>();
		Map<String, String> maps = new HashMap<>();

		@Override
		protected void reduce(Text key, Iterable<Text> value, Context context)
				throws IOException, InterruptedException {
			for (Text t : value) {
				String[] values = t.toString().split(" ");
				String student = values[2] + ":" + values[0] + ":" + values[1];
				String subject = values[3];
				double score = Integer.valueOf(values[4]);
				if ("bishi".equals(subject)) {
					score *= 0.4;
				} else {
					score *= 0.3;
				}
				// 如果map中有学生,累加学生的没门课程的分数
				if (map.containsKey(student)) {
					double scores = map.get(student);
					scores += score;
					map.put(student, scores);
				} else {
					// 第一次进入时不包含,则直接添加
					map.put(student, score);
				}
			}

			for (Map.Entry<String, Double> m : map.entrySet()) {
				String classname = m.getKey().split(":")[2];
				Double score = m.getValue();
				if (maps.containsKey(classname) && score >= 60) {
					String k = Integer.parseInt(maps.get(classname).split(":")[0]) + 1 + "";
					String v = Integer.parseInt(maps.get(classname).split(":")[1]) + 1 + "";
					maps.put(classname, k + ":" + v);
				} else if (maps.containsKey(classname) && score < 60) {
					String k = Integer.parseInt(maps.get(classname).split(":")[0]) + 1 + "";
					String v = Integer.parseInt(maps.get(classname).split(":")[1]) + "";
					maps.put(classname, k + ":" + v);
				} else if (!maps.containsKey(classname) && score < 60) {
					maps.put(classname, "1:0");
				} else if (!maps.containsKey(classname) && score >= 60) {
					maps.put(classname, "1:1");
				}
			}

		}

		@Override
		protected void cleanup(Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			for (Map.Entry<String, String> m : maps.entrySet()) {
				DecimalFormat d = new DecimalFormat("0.00%");
				double pass = Double.valueOf(m.getValue().split(":")[1]) / Double.valueOf(m.getValue().split(":")[0]);
				context.write(new Text(m.getKey()), new Text("及格率为:" + d.format(pass)));
			}
		}
	}

	public static void main(String[] args) throws Exception {

		// 配置容器
		Configuration conf = new Configuration();
		// 创建一个job
		@SuppressWarnings("deprecation")
		Job job = new Job(conf, "MyMapReduce Count");
		// 配置job
		job.setJarByClass(HomeWork3.class);
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		// 输入输出
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 执行程序
		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion ? 0 : 1);

	}

}


   MapReduce一个分布式并行离线计算框架。我们只需要知道map(),reduce(),input,output,剩下的由框架完成



基于yarn的工作流程




原文地址:https://www.cnblogs.com/meiLinYa/p/9231026.html