wordcount复习

MapReduce的产生

MapReduce最早是由Google公司研究提出的一种面向大规模数据处理的并行计算模型和方法。Google公司设计MapReduce的初衷主要是为了解决其搜索引擎中大规模网页数据的并行化处理。

2003年和2004年,Google公司在国际会议上分别发表了两篇关于Google分布式文件系统和MapReduce的论文,公布了Google的GFS和MapReduce的基本原理和主要设计思想。

可以把MapReduce理解为,把一堆杂乱无章的数据按照某种特征归纳起来,然后处理并得到最后的结果。Map面对的是杂乱无章的互不相关的数据,它解析每个数据,从中提取出key和value,也就是提取了数据的特征。经过MapReduce的Shuffle阶段之后,在Reduce阶段看到的都是已经归纳好的数据了,在此基础上我们可以做进一步的处理以便得到结果

MapReduce会生成大量的临时文件,为了提高效率,它利用Google文件系统来管理和访问这些文件。

主要功能

1)数据划分和计算任务调度

2)数据/代码互定位

3)系统优化

4)出错检测和恢复

优点

易于编程:它简单的实现了一些接口,不需要关注太多底层逻辑(如网络协议等),就可以写一个分布式程序。

良好的扩展性:当计算资源不能得到满足时,通过简单的增加物理机来增强计算能力以及存储能力。

高容错性:Mapreduce设计的初衷就是部署在廉价的pc机器上,这就要求它具有很高的容错性,比如其中一台机器挂了,通过hadoop的管理,此机器上的任务可以转移到其他节点上运行,且此过程不需要人工参与。

适合PB级别以上海量数据存储以及上千台服务器集群并发工作

缺点

不擅长实时计算:无法像mysql一样在秒级别以内返回结果。

**不擅长流式计算(并不是不能实现--Spark Streaming):流式计算的输入是动态的,由于mapreduce自身的设计特点使得mapreduce的输入数据是静态的,不能动态化。即使实现了动态数据的输入,因为其返回结果的速度相对慢,所以也不适合。

不擅长DAG(有向图)式计算:若多个任务存在依赖关系。例如c的输入依赖于b的输出,b的输入依赖于a的输出,而marpduce在执行每一个任务时都会把数据存储到磁盘中,使得程序之间的通信相对慢。

总结:慢

写一个MapReduce的一个案例WordCount

WcMapper.java

 
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import java.io.IOException;
    
    /**前两个泛型为MR框架给mapper程序的输入,输入的对象为一行数据对象,< 行首的偏移量,行的内容>
     * 例如需要处理的源文件为    hello hadoop
     *                       hello flink
     *则此时mapper需要处理的第一个数据对象为0,"hello hadoop"), 0对应LongWritable,“hello hadoop“对应 Text
     *处理完第一个再重复处理第二个。
     *
     * @author xido
    */
    public class WcMapper extends Mapper< LongWritable,Text, Text, IntWritable> {
        private Text word = new Text();
        private IntWritable one = new IntWritable(1);
    
        @Override
    //    重写父类的map方法
    //    Context对象为当前整个mapreduce任务给map的一个输入输出接口,map从Context读取输入处理完输出给Context
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    //      拿到这一行数据
            String line = value.toString();
    
    //      按照空格切分数据装入一个数组容器
            String[] words = line.split(" ");
    
    //      遍历数组,把单词变成(word,1)的形式交给Context
            for (String word:words) {
    //          原为context.write(new Text(word),new IntWritable(1));
    //          由于大量new对象浪费资源,所以定义了成员变量
                this.word.set(word);
                context.write(this.word,this.noe);
            }
        }
    }
    

Reducer.java

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.io.IOException;
    
    /**
     * 得到的数据对象为Context聚合之后的数据:map输出给Context,Context将结果进行聚合
     * 例如 mapper输出的值中 将同key的数据聚合(例 (hello,1) (hello,1)聚合为 (hello,{1,1}(一个Iterable对象))
     * 然后再输出给reducer处理
     */
    public class WcReducer  extends Reducer< Text,IntWritable,Text, IntWritable> {
        private IntWritable total = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable< IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
    //      遍历包含n个1的Iterable对象
            for(IntWritable value:values){
    //          此处可以写 sum+=1,但不推荐写1。原因待笔者研究
                sum+=value.get();
            }
    //        包装结果并输出
            total.set(sum);
            context.write(key,total);
        }
    }
    

Driver.java

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    
    public class WcDriver {
    
        /*
        *前面只是定义了map与reduce的逻辑
        * 但是没有接入MR框架
        *在Driver中配置map与reduce的环境(输入文件输出文件等)
        */
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    //     1 获取一个job实例 job即一个mapreduce task
                   Job job =  Job.getInstance(new Configuration());
    
    //     2 设置Driver类路径
            job.setJarByClass(WcDriver.class);
    
    //      3 设置mapper与reducer
            job.setMapperClass(WcMapper.class);
            job.setReducerClass(WcReducer.class);
    
    //      4 设置Mapper与Reducer输出的类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
    //      设置reducer输出即总输出
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
    //      5  设置输入输出数据存储的文件夹
    //         执行时Path由main指定
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
    //      6 提交我们的job
            boolean b= job.waitForCompletion(true);
            System.exit(b?0:1);
        }
    }

PS

1 参考了 百度 以及尚硅谷的hadoop课程及ppt

2 写(ban)作(yun)匆忙,各位若发现错误请多交流指正

3 学有余力的同学可以试试 阿里巴巴hadoop笔试

原文地址:https://www.cnblogs.com/xido/p/13839956.html