Scala实现Mapreduce程序1-----求平均数

输入:

语文成绩:

a  89

b  88

c  90

d  77

数学成绩:

a  80

b  90

c  98

d  98

输出:

a  84.5

b  89

c  94

d  87.5

Scala程序实现:按照名字分组,然后计算总成绩以及课程个数

object AverageScore {
def main(args: Array[String]): Unit = {
val sc=new SparkContext(new SparkConf().setMaster("local"))
val one=sc.textFile("/spark/test/",2)
val line=one.filter(_.trim.length>0).map(text=>{
(text.split(" ")(0).trim,text.split(" ")(1).trim.toInt)
}).groupByKey().map(text=>{
var sum=0;
var num=0.0;
for(x<-text._2){
sum=sum+text._1.toInt;
num+=1
}
val avg=sum/num
val format=f"$avg%1.2f".toDouble
(text._1,format)
}).collect().foreach(x=>println("name"+x._1+"Average score"+x._2))
}

}

用Mapreduce实现:
package HadoopvsSpark;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import scala.tools.cmd.gen.AnyVals;

import java.io.IOException;

/**
* Created by Administrator on 2017/5/25.
*/
public class AverageScore {
public static class AverageMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
public void map(Text key,IntWritable value,Context context) throws IOException, InterruptedException {
String line=key.toString();
if(line.trim().length()>0){
String[] arr=line.split( " " );
context.write( new Text( arr[0] ),new IntWritable( Integer.valueOf( arr[1]) ) );
}
}
}

public static class AverageReducer extends Reducer<Text,IntWritable,Text,DoubleWritable>{
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum=0;
for(IntWritable val:values){
sum+=val.get();
}
context.write( key,new DoubleWritable( sum/3.0 ) );
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job=new Job();
job.setJarByClass( AverageScore.class );

job.setMapperClass( AverageMapper.class );
job.setMapOutputKeyClass( Text.class );
job.setMapOutputValueClass( IntWritable.class );

job.setReducerClass( AverageReducer.class );
job.setOutputKeyClass( Text.class );
job.setOutputValueClass( DoubleWritable.class );

FileInputFormat.addInputPath( job,new Path(args[0]) );
Path outputdir=new Path( args[1] );
FileSystem fs=FileSystem.get( conf );
if(fs.exists( outputdir )){
fs.delete( outputdir,true );
}
FileOutputFormat.setOutputPath(job,outputdir );
System.out.println(job.waitForCompletion( true )?0:1);

}
}

参考博客:
http://blog.csdn.net/kwu_ganymede/article/details/50482948

原文地址:https://www.cnblogs.com/sunt9/p/6936352.html