【2019/8/18】暑假自学——周进度报告6

  这次博客记录下MapReduce模型的编程和相关学习。

  MapReduce的最主要的特点就是移动计算,而不是数据跟着计算走,这个在分布式系统中十分有效,最大的好处就是节约数据移动的开销,用很小的数据流量来完成对数据的分析和计算。

  • MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce
  • 这两者分别有不同功能,在MapReduce中,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的小数据块,这些小数据块可以被多个Map任务并行处理,MapReduce框架会为每个Map任务输入一个数据子集,Map任务生成的结果会继续作为Reduce任务的输入,最终由Reduce任务输出最后结果,并写入到分布式文件系统中。

  在这个过程中,需要关注的是map和reduce的函数编写以及由map向reduce传递的shuffle过程。

  Map的过程一般承担了初步分析的任务,把小数据按照映射规则进行输出。

  Map

  • 输入:<k1,v1>
  • 输出:List(<k2,v2>)
  • 将小数据进一步分析成一批<key,value>对,输入Map函数进行处理;每一个输入的<k1,v1>会输出一批<k2,v2>

  从Map输出到Reduce输入的整个过程可以广义地称为shuffle。从map端的输出结果写入缓存并进行分区、排序、合并和归并,并且通知Reduce来领取归并后的数据(也就是任务)。

  领取到任务后Reduce就可以开始自己的工作。

  Reduce

  • 输入:<k2,List(v2)>
  • 输出:<k3,v3>
  • 输入的结果<k2,List(v2)>中的List(v2)表示是同一批属于同一个v2的value

  之后reduce输出的结果基本就是最终结果,只需要检查是否符合就可以了。


  在网上看到了MapReduce过程的大白话解释——

以下摘自:用通俗易懂的大白话讲解Map/Reduce原理

我问妻子:“你真的想要弄懂什么是MapReduce?” 她很坚定的回答说“是的”。 因此我问道:

: 你是如何准备洋葱辣椒酱的?(以下并非准确食谱,请勿在家尝试)

妻子: 我会取一个洋葱,把它切碎,然后拌入盐和水,最后放进混合研磨机里研磨。这样就能得到洋葱辣椒酱了。

妻子: 但这和MapReduce有什么关系?

: 你等一下。让我来编一个完整的情节,这样你肯定可以在15分钟内弄懂MapReduce.

妻子: 好吧。

:现在,假设你想用薄荷、洋葱、番茄、辣椒、大蒜弄一瓶混合辣椒酱。你会怎么做呢?

妻子: 我会取薄荷叶一撮,洋葱一个,番茄一个,辣椒一根,大蒜一根,切碎后加入适量的盐和水,再放入混合研磨机里研磨,这样你就可以得到一瓶混合辣椒酱了。

: 没错,让我们把MapReduce的概念应用到食谱上。MapReduce其实是两种操作,我来给你详细讲解下。
Map(映射): 把洋葱、番茄、辣椒和大蒜切碎,是各自作用在这些物体上的一个Map操作。所以你给Map一个洋葱,Map就会把洋葱切碎。 同样的,你把辣椒,大蒜和番茄一一地拿给Map,你也会得到各种碎块。 所以,当你在切像洋葱这样的蔬菜时,你执行就是一个Map操作。 Map操作适用于每一种蔬菜,它会相应地生产出一种或多种碎块,在我们的例子中生产的是蔬菜块。在Map操作中可能会出现有个洋葱坏掉了的情况,你只要把坏洋葱丢了就行了。所以,如果出现坏洋葱了,Map操作就会过滤掉坏洋葱而不会生产出任何的坏洋葱块。

Reduce(化简):在这一阶段,你将各种蔬菜碎都放入研磨机里进行研磨,你就可以得到一瓶辣椒酱了。这意味要制成一瓶辣椒酱,你得研磨所有的原料。因此,研磨机通常将map操作的蔬菜碎聚集在了一起。

妻子: 所以,这就是MapReduce?

: 你可以说是,也可以说不是。 其实这只是MapReduce的一部分,MapReduce的强大在于分布式计算。

妻子: 分布式计算? 那是什么?请给我解释下吧。

: 没问题。

: 假设你参加了一个辣椒酱比赛并且你的食谱赢得了最佳辣椒酱奖。得奖之后,辣椒酱食谱大受欢迎,于是你想要开始出售自制品牌的辣椒酱。假设你每天需要生产10000瓶辣椒酱,你会怎么办呢?

妻子: 我会找一个能为我大量提供原料的供应商。

:是的…就是那样的。那你能否独自完成制作呢?也就是说,独自将原料都切碎? 仅仅一部研磨机又是否能满足需要?而且现在,我们还需要供应不同种类的辣椒酱,像洋葱辣椒酱、青椒辣椒酱、番茄辣椒酱等等。

妻子: 当然不能了,我会雇佣更多的工人来切蔬菜。我还需要更多的研磨机,这样我就可以更快地生产辣椒酱了。
:没错,所以现在你就不得不分配工作了,你将需要几个人一起切蔬菜。每个人都要处理满满一袋的蔬菜,而每一个人都相当于在执行一个简单的Map操作。每一个人都将不断的从袋子里拿出蔬菜来,并且每次只对一种蔬菜进行处理,也就是将它们切碎,直到袋子空了为止。
这样,当所有的工人都切完以后,工作台(每个人工作的地方)上就有了洋葱块、番茄块、和蒜蓉等等。

妻子:但是我怎么会制造出不同种类的番茄酱呢?

:现在你会看到MapReduce遗漏的阶段—搅拌阶段。MapReduce将所有输出的蔬菜碎都搅拌在了一起,这些蔬菜碎都是在以key为基础的 map操作下产生的。搅拌将自动完成,你可以假设key是一种原料的名字,就像洋葱一样。 所以全部的洋葱keys都会搅拌在一起,并转移到研磨洋葱的研磨器里。这样,你就能得到洋葱辣椒酱了。同样地,所有的番茄也会被转移到标记着番茄的研磨器里,并制造出番茄辣椒酱。

  


  下面是记录一次MapReduce的Word count实例。

  在此之前我已经弄好了Hadoop以及eclipse的插件配置工作,能够通过eclipse的编程访问并修改hdfs内容。

  首先创建hdfs内的input文件夹,用来存放输入文件(输入文件是Hadoop的xml配置文件)。

./bin/hdfs dfs -mkdir input
./bin/hdfs dfs -put /usr/local/hadoop/etc/hadoop/*.xml input

  然后就是java代码:

package org.apache.hadoop.examples;
 
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class WordCount {
    public WordCount() {
    }
 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

// String[] otherArgs=new String[]{"hdfs://192.168.0.100:9000/user/hadoop/input","hdfs://192.168.0.100:9000/user/hadoop/output"}; /* 直接设置输入参数 */

      if(otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
 
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCount.TokenizerMapper.class);
        job.setCombinerClass(WordCount.IntSumReducer.class);
        job.setReducerClass(WordCount.IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
 
        for(int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
 
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
 
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
 
        public IntSumReducer() {
        }
 
        public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
 
            IntWritable val;
            for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                val = (IntWritable)i$.next();
            }
 
            this.result.set(sum);
            context.write(key, this.result);
        }
    }
 
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();
 
        public TokenizerMapper() {
        }
 
        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
 
            while(itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write(this.word, one);
            }
 
        }
    }
}

  在这个代码里面有MapReduce的基本定义和运行方式,但我还是看不太懂。。

  之后运行就可以看到在hdfs的output文件夹内出现运行结果。

  


  记录下遇到的问题——

  1.Failed to locate the winutils binary in the hadoop binary path或nullinwinutils.exe

  这个是缺少winutils.exe导致的,也不清楚是干嘛的,安装就对了。。

  安装方式是下载对应版本的一个叫hadoop-common-bin的东西,内容如下

  可以看到有需要的winutils ,解压到一个目录,在系统环境变量里面设置HADOOP_HOME=该目录以及PATH中添加%HADOOP_HOME%in,重启eclipse就可以。

  2.hdfs文件permission denied问题

  我是用户名出问题了,同样是系统环境变量,添加HADOOP_USER_NAME=hadoop(我Linux是hadoop名称的用户),重启eclipse。

原文地址:https://www.cnblogs.com/limitCM/p/11390652.html