MapReduce编程

1.mapreduce的定义

  MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架;

  MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并运行在一个Hadoop集群上;

2.mapreduce的核心思想

  “分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景);

  Map负责“分”,即把复杂的任务分解为若干个“简单任务”来并行处理,可以拆分的前提是这些小任务可以并行计算,彼此之间没有依赖关系;

  Reduce负责“合”,即对map阶段的结果进行全局汇总;

3.mapreduce编程模型

  mapreduce是采用分而治之的思想设计出来的分布式计算框架;复杂、计算量大、耗时长的任务,暂且称为“大任务”;使用单台服务器无法计算或在较短时间内计算出结果时,可将此大任务切分成一个个小的任务,小任务分别在不同的服务器上并行的执行;最终在汇总每个小任务的结果。

  MapReduce有两个阶段组成:参考

    Map阶段:其分成一个个小任务

    Reduce阶段:汇总小任务的结果

     

  1)Map阶段

    map阶段有一个关键的map();

    map()的输入是键值对,输出是一系列的键值对,输出写入本地磁盘;

  2)Reduce阶段

    reduce阶段有一个关键的函数reduce();

    map()的输入是map()输出的kv对,输出也是一些列的键值对,最终结果写入hdfs;

  3)Map&Reduce

    

   4)MapReduce编程思想(8个步骤)

    Map阶段2个步骤

    ①设置inputformat类,将我们的数据切分成key,value对,输入到第二个步骤

    ②自定义map逻辑,处理我们第一步的输入数据,然后转换成新的key,value对进行输出

    shuffle阶段4个步骤

    ③对输出的key,value对进行分区。相同key的数据发送到同一个reduce里面去,相同key合并,value形成一个集合

    ④对不同分区的数据按照相同的key进行排序

    ⑤对分组后的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)

    ⑥对排序后的数据进行分组,分组的过程中,将相同key的value放到一个集合当中

    reduce阶段2个步骤

    ⑦对多个map的任务进行合并,排序,自定义reduce逻辑,对输入的key,value对进行处理,转换成新的key,value对进行输出

    ⑧设置outputformat将输出的key,value对数据进行保存到文件中

  5)hadoop当中常用的数据类型

    hadoop没有沿用java当中基本的数据类型,而是自己进行封装了一套数据类型,其自己封装的类型与java类型对应如下

     

  6)mapreduce编程——词频统计代码

    a)pom文件

<repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-mr1-cdh5.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.14.2</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0-cdh5.14.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>RELEASE</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--   <verbal>true</verbal>-->
                </configuration>
            </plugin>
 
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
View Code

    b)自定义Mapper类

 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import java.io.IOException;
 
 /**
  * 自定义mapper类需要继承Mapper,有四个泛型,
  * keyin: k1   行偏移量 Long
  * valuein: v1   一行文本内容   String
  * keyout: k2   每一个单词   String
  * valueout : v2   1         int
  * 在hadoop当中没有沿用Java的一些基本类型,使用自己封装了一套基本类型
  * long ==>LongWritable
  * String ==> Text
  * int ==> IntWritable
  *
  */
 public class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    /**
      * 继承mapper之后,覆写map方法,每次读取一行数据,都会来调用一下map方法
      * @param key:对应k1
      * @param value:对应v1
      * @param context 上下文对象。承上启下,承接上面步骤发过来的数据,通过context将数据发送到下面的步骤里面去
      * @throws IOException
      * @throws InterruptedException
      * k1   v1
      * 0;hello,world
      *
      * k2 v2
      * hello 1
      * world   1
      */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取我们的一行数据
        String line = value.toString();
        String[] split = line.split(",");
        Text text = new Text();
        IntWritable intWritable = new IntWritable(1);
        for (String word : split) {
            //将每个单词出现都记做1次
            //key2 Text类型
            //v2 IntWritable类型
            text.set(word);
            //将我们的key2 v2写出去到下游
            context.write(text,intWritable);
        }
    }
 }
View Code

    c)自定义Reducer类

 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Reducer;
 
 import java.io.IOException;
 
 public class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    //第三步:分区   相同key的数据发送到同一个reduce里面去,相同key合并,value形成一个集合
    /**
      * 继承Reducer类之后,覆写reduce方法
      * @param key
      * @param values
      * @param context
      * @throws IOException
      * @throws InterruptedException
      */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int result = 0;
        for (IntWritable value : values) {
            //将我们的结果进行累加
            result += value.get();
        }
        //继续输出我们的数据
        IntWritable intWritable = new IntWritable(result);
        //将我们的数据输出
        context.write(key,intWritable);
    }
 }
View Code

    d)main程序

 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 /*
 这个类作为mr程序的入口类,这里面写main方法
  */
 public class WordCount extends Configured implements Tool{
    /**
      * 实现Tool接口之后,需要实现一个run方法,
      * 这个run方法用于组装我们的程序的逻辑,其实就是组装八个步骤
      * @param args
      * @return
      * @throws Exception
      */
    @Override
    public int run(String[] args) throws Exception {
        //获取Job对象,组装我们的八个步骤,每一个步骤都是一个class类
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf, "mrdemo1");
 
        //实际工作当中,程序运行完成之后一般都是打包到集群上面去运行,打成一个jar包
        //如果要打包到集群上面去运行,必须添加以下设置
        job.setJarByClass(WordCount.class);
 
        //第一步:读取文件,解析成key,value对,k1:行偏移量 v1:一行文本内容
        job.setInputFormatClass(TextInputFormat.class);
        //指定我们去哪一个路径读取文件
        TextInputFormat.addInputPath(job,new Path("inputFilePath"));
        //第二步:自定义map逻辑,接受k1   v1 转换成为新的k2   v2输出
        job.setMapperClass(MyMapper.class);
        //设置map阶段输出的key,value的类型,其实就是k2 v2的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //第三步到六步:分区,排序,规约,分组都省略
        //第七步:自定义reduce逻辑
        job.setReducerClass(MyReducer.class);
        //设置key3 value3的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //第八步:输出k3 v3 进行保存
        job.setOutputFormatClass(TextOutputFormat.class);
        //一定要注意,输出路径是需要不存在的,如果存在就报错
        TextOutputFormat.setOutputPath(job,new Path("outputFilePath"));
        //提交job任务
        boolean b = job.waitForCompletion(true);
        return b?0:1;
        /***
          * 第一步:读取文件,解析成key,value对,k1   v1
          * 第二步:自定义map逻辑,接受k1   v1 转换成为新的k2   v2输出
          * 第三步:分区。相同key的数据发送到同一个reduce里面去,key合并,value形成一个集合
          * 第四步:排序   对key2进行排序。字典顺序排序
          * 第五步:规约 combiner过程 调优步骤 可选
          * 第六步:分组
          * 第七步:自定义reduce逻辑接受k2   v2 转换成为新的k3   v3输出
          * 第八步:输出k3 v3 进行保存
          *
          *
          */
    }
    /*
    作为程序的入口类
      */
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("hello","world");
        //提交run方法之后,得到一个程序的退出状态码
        int run = ToolRunner.run(configuration, new WordCount(), args);
        //根据我们 程序的退出状态码,退出整个进程
        System.exit(run);
    }
 }
View Code

4.hadoop的序列化与反序列化

  Java的序列化(Serializable)是一个重量级的序列化框架,一个对象被序列化后,会附带很多额外的信息,不便于在网络中高效传输;

  Hadoop自己开发了一套序列化机制(Writable),精简,高效。Writeable是一个接口,类实现接口即可序列化;

5.MapTask、Split、ReduceTask

  Reduce Task个数由代码设置:job.setNumReduceTask(3),一般reduce的个数即分区个数;

  Map Task个数由Split(分片)个数决定,每个Split一个map task;

  Split计算公式:Math.max(minSize, Math.min(maxSize, blockSize));

    mapreduce.input.fileinputformat.split.minsize=1 默认值为1  

    mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue  

    blockSize默认为128M;

 6.InputFormat详解

  1)InputFormat

    InputFormat是mapreduce当中用于处理数据输入的一个组件,是最顶级的一个抽象父类,主要用于解决各个地方的数据源输入问题。其中InputFormat的UML类图可以通过idea进行查看

    

    

  2)FileInputFormat常用类介绍

    FileInputFormat类也是InputFormat的一个子类,如果需要操作hdfs上面的文件,基本上都是通过FileInputFormat类来实现的。可以通过FileInputFormat来实现各种格式的文件操作,FileInputFormat的子类实现类图如下

    

     

      

  3)MapTask的数量以及文件的输入切片机制

     

    mapreduce中每个maptask处理一个split的数据量;

    split概念与block很像,block是hdfs当中存储数据的单位,split是mapreduce中每个maptask处理数据量的单位;

    maptask并行度决定机制;

    block是hdfs物理上把数据分成一块一块,splite只是在逻辑上对输入进行分片,并不会在磁盘上将其其分成片进行存储;

    FileInputFormat默认分片是block块的大小,修改maxsize、minsize调整切片大小

  4)CombineTextInputFormat详解

    默认的TextInputFormat切片机制是对任务按照文件规划切片,不管文件多小,都会是单独的切片,交给一个maptask,如果有大量的小文件,就会产生大量的maptask,处理效率比较低。

    在小文件过多的场景使用CombineTextInputFormat可以将多个小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个MapTask处理。

    切片过程:虚拟存储过程和切片过程两部分

      虚拟存储过程:

        ①将输入目录下所有文件大小,一次和设置的setMaxInputSplitSize【CombineTextInputFormat.setMaxInputSplitSize(job, 4194304)】的值作比较;

        ②如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;

        ③当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件分成2个虚拟存储块(防止出现太小切片);

        ex:setMaxInputSplitSize 为4M,输入文件为8.02,逻辑上分成一个4M的虚拟块,剩余大小为4.02,如果按照4m逻辑划分就会出现一个0.02m的虚拟块,所以将4.02文件切为2.01m和2.01m两个虚拟块

          setMaxInputSplitSize 为4m也就满足了setMinInputSplitSize 为 2

      切片过程:

        ①判断虚拟存储的文件与setMaxInputSplitSize 值作比较,大于等于则单独形成一个切片;

        ②如果不大于则与下一个存储文件合并重新执行步骤①;

        ex:5个文件大小分别为1.7M、0.1M、5.1M、3.4M、6.8M

          5个文件有7个虚拟存储块:1.7M、0.1M、(2.55M、2.55M)、3.4M、(3.4M、3.4M)

          最终会形成3个分片:(1.7+0.1+2.55)M、(2.55+3.4)M、(3.4+3.4)M

  5)自定义InputFormat

    mapreduce框架当中提供了很多的文件输入类,用于处理文件数据的输入,如果提供的数据类不足以实现我们的需求,可以通过自定义的InputFormat来实现文件数据的输入。

    自定义MyInputFormat继承InputFormat,重写isSplitable方法(是否需要分片),重写createRecordReader方法

    自定义MyRecordReader继承RecordReader,实现抽象方法

7.partitioner详解

  partition(分区)主要是将相同的数据发送到同一个reduceTask里面去,在mapreduce当中有一个抽象类叫做Partitioner,默认使用的实现类是HashPartitioner;

  自定义partitioner继承Partitioner类重写getPartition方法

  根据设定的partition设置相应的reduce task数量 tip:partition大于reduce数量报错,partition小于reduce数量会生成空文件

8.mapreduce中的排序

  排序是mapReduce框架中的重要操作之一,mapTask和reduceTask均会对数据按照key排序,该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认按照字典排序,且实现方法为快排。

  对于MapTask,它会将处理结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,当出具处理完毕后,对磁盘上所有文件进行归并排序。

  对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

  排序分类:

    部分排序:根据输入记录的键对数据集进行排序,保证输出的每个文件内部都有序

    全排序:最终输出结果只有一个文件,通过只设置一个reduceTask实现,这样mapreduce所提供的并行架构就丧失了

    辅助排序:在Reduce段对key进行分组。应用在key为bean对象时,想让一个或几个字段相同的key进入到同一个reduce时,可以采用分组排序

    二次排序:自定义排序过程中,compareTo判断条件为两个即为二次排序

9.combiner详解

  Combiner(规约)是mapreduce中Mapper和Reducer之外的一种组件;

  Combiner继承自Reducer,但是Combiner是在每一个MapTask所在的节点上运行,而Reducer是接受全局所有Mapper的输出结果;

  Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量;

  Combiner应用的前提是不能影响最终的业务逻辑,且输出的k应该与Reducer的输入k类型对应

  使用:自定义Combiner继承Reducer类,重写reduce方法,使用job.setCombinerClass()设置自定义Combiner组件

10.GroupingComparator分组详解

  GroupingComparator是MapReduce当中reduce端的一个功能组件,主要作用是决定那些数据为一组调用一次reduce的逻辑,默认是每个不同的key作为不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一组,调用一次reduce逻辑。

  分组本质是排序,步骤如下:

    ①自定义类继承WriteableComparator

    ②创建一个构造方法将比较对象传给父类

    ③重写compare(WritableComparable a, WritableComparable b)方法

protected OrderGroupingComparator() {
        super(MyBean.classtrue);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
        // 比较的业务逻辑
        return result;
}
View Code

  设置分组:job.setGroupingComparatorClass(MyGroup);

  tip:重写compare(WritableComparable a, WritableComparable b),不是compare(Object a, Object b)

11.自定义outputFormat

  自定义MyOutputFormat继承FileOutputFormat类,重写RecordWriter方法

  自定义MyRecordWriter继承RecordWriter类,实现抽象方法

原文地址:https://www.cnblogs.com/dan2/p/13724020.html