MapReduce实例

一、MapReduce 原理
MapReduce 是一种变成模式,用于大规模的数据集的分布式运算。通俗的将就是会将任务分给不同的机器做完,然后在收集汇总。
MapReduce有两个核心:Map,Reduce,它们分别单独计算任务,每个机器尽量计算自己hdfs内部的保存信息,Reduce则将计算结果汇总。

一、WordCount单词统计

1.1 数据准备test.txt

hello hadoop
wille learn hadoop WordCount
but the hadoop is not easy

1.2 Map程序:

package com.ice.hadoop.test.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

  @Override
  protected void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    String line = value.toString();
    String[] words = line.split(" ");
    for (String word : words) {
      context.write(new Text(word), new IntWritable(1));
    }
  }
}

这里定义了一个mapper类,其中有一个map方法。MapReduce框架每读到一行数据,就会调用一次这个map方法。
Mapper<LongWritable, Text, Text, IntWritable>其中的4个类型分别是:输入key类型、输入value类型、输出key类型、输出value类型。
MapReduce框架读到一行数据侯以key value形式传进来,key默认情况下是mr矿机所读到一行文本的起始偏移量(Long类型),value默认情况下是mr框架所读到的一行的数据内容(String类型)。
输出也是key value形式的,是用户自定义逻辑处理完成后定义的key,用户自己决定用什么作为key,value是用户自定义逻辑处理完成后的value,内容和类型也是用户自己决定。
此例中,输出key就是word(字符串类型),输出value就是单词数量(整型)。
这里的数据类型和我们常用的不一样,因为MapReduce程序的输出数据需要在不同机器间传输,所以必须是可序列化的,例如Long类型,Hadoop中定义了自己的可序列化类型LongWritable,String对应的是Text,int对应的是IntWritable。

1.3 Reduce程序:

package com.ice.hadoop.test.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

  @Override
  protected void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    Integer count = 0;
    for (IntWritable value : values) {
      count += value.get();
    }
    context.write(key, new IntWritable(count));
  }
}

这里定义了一个Reducer类和一个reduce方法。当传给reduce方法时,就变为:Reducer<Text, IntWritable, Text, IntWritable> 4个类型分别指:输入key的类型、输入value的类型、输出key的类型、输出value的类型。
需要注意,reduce方法接收的是:一个字符串类型的key、一个可迭代的数据集。因为reduce任务读取到map任务处理结果是这样的:
(good,1)(good,1)(good,1)(good,1)
当传给reduce方法时,就变为:
key:good
value:(1,1,1,1)
所以,reduce方法接收到的是同一个key的一组value。

1.4 Main程序

package com.ice.hadoop.test.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountMapReduce {

  public static void main(String[] args) throws Exception {
    //创建配置对象
    Configuration conf = new Configuration();
    //创建Job对象
    Job job = Job.getInstance(conf, "wordCount");
    //设置mapper类
    job.setMapperClass(WordcountMapper.class);
    //设置 Reduce类
    job.setReducerClass(WordCountReducer.class);

    //设置运行job类
    job.setJarByClass(WordCountMapReduce.class);

    //设置map输出的key,value类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    //设置reduce输出的key,value类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    //设置输入路径金额输出路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //提交job
    boolean b = job.waitForCompletion(true);

    if (!b){
      System.out.println("word count failed!");
    }
  }
}

编译打包后:

hdfs dfs -mkdir -p /wordcount/input
hdfs dfs -put test.txt /wordcount/input

执行wordcount jar
hadoop jar mapreduce-wordcount-0.0.1-SNAPSHOT.jar com/ice/hadoop/test/wordcount/WordCountMapReduce /wordcount/input /wordcount/output

执行完成后验证
hdfs dfs -cat /wordcount/output/*

二、hadoop 序列化

hadoop 为什么不使用java序列化
Hadoop的序列化机制与java的序列化机制不同,它将对象序列化到流中,值得一提的是java的序列化机制是不断的创建对象,但在Hadoop的序列化机制中,用户可以复用对象,这样就减少了java对象的分配和回收,提高了应用效率。

Hadoop定义了新的序列化接口——writable:

 package org.apache.hadoop.io

 import java.io.DataOutput
 import java.io.DataInput
 import java.io.IOException

 public interface Writable{
    void write(DataOutput out) throws IOException;
    void readFields(DataInput in) throws IOException;
 }

通过实现 Writable 接口,完成序列化与反序列化。

但更多的时候,Hadoop要求同时实现序列化与可对比性,因此更常见的情况下需要实现的是 WritableComparable 接口。同时给出默认的构造函数供 MapReduce 进行实例化。下面给出一个自定义Hadoop可序列化类的示例:

import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<TextPair> {

  private Text first;
  private Text second;
  public TextPair() {
    set(new Text(), new Text());
  }

  public TextPair(String first, String second) {
    set(new Text(first), new Text(second));
  }

  public TextPair(Text first, Text second) {
    set(first, second);
  }

  public void set(Text first, Text second) {
    this.first = first;
    this.second = second;
  }

  public Text getFirst() {
    return first;
  }

  public Text getSecond() {
    return second;
  }

  @Override
  public void write(DataOutput out) throws IOException {
    first.write(out);
    second.write(out);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    first.readFields(in);
    second.readFields(in);
  }

  @Override
  public int hashCode() {
    return first.hashCode() * 163 + second.hashCode();
  }

  @Override
  public boolean equals(Object o) {
    if (o instanceof TextPair) {
      TextPair tp = (TextPair) o;
      return first.equals(tp.first) && second.equals(tp.second);
    }
    return false;
  }

  @Override
  public String toString() {
    return first + "	" + second;
  }

  @Override
  public int compareTo(TextPair tp) {
    int cmp = first.compareTo(tp.first);
    if (cmp != 0) {
      return cmp;
    }
    return second.compareTo(tp.second);
  }
}

2.1 需求与实现思路

需要统计手机用户流量日志,日志内容实例

手机号 上行流量 下行流量
1252548225 200 1100
1345858685 300 1200
1862538225 400 1300
1545858645 100 300
1502236225 500 1300
1362858685 300 1100

要把同一个用户的上行流量、下行流量进行累加,并计算出综合。

例如上面的13897230503有两条记录,就要对这两条记录进行累加,计算总和,得到:

13897230503,500,1600,2100

2.2 实现思路

  • map
    接收日志的一行数据,key为行的偏移量,value为此行数据。
    输出时,应以手机号为key,value应为一个整体,包括:上行流量、下行流量、总流量。
    手机号是字符串类型Text,而这个整体不能用基本数据类型表示,需要我们自定义一个bean对象,并且要实现可序列化。
    key: 13897230503
    value: < upFlow:100, dFlow:300, sumFlow:400 >
  • reduce
    接收一个手机号标识的key,及这个手机号对应的bean对象集合

例如:
key:
13897230503
value:
< upFlow:400, dFlow:1300, sumFlow:1700 >,
< upFlow:100, dFlow:300, sumFlow:400 >
迭代bean对象集合,累加各项,形成一个新的bean对象,例如:
< upFlow:400+100, dFlow:1300+300, sumFlow:1700+400 >
最后输出:
key: 13897230503
value: < upFlow:500, dFlow:1600, sumFlow:2100 >

2.3 map程序

创建实体并实现Writable

package com.ice.hadoop.test.flowbean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

/**
 * @author:ice
 * @Date: 2019/2/22 0022
 */
public class FlowBean implements Writable {

  private long upFlow;
  private long dFlow;
  private long sumFlow;

  public FlowBean() {
  }

  public FlowBean(long upFlow, long dFlow) {
    this.upFlow = upFlow;
    this.dFlow = dFlow;
    this.sumFlow = upFlow + dFlow;
  }

  @Override
  public void write(DataOutput out) throws IOException {
    out.writeLong(upFlow);//wirte写入的顺序与read读取顺序
    out.writeLong(dFlow);
    out.writeLong(sumFlow);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    upFlow = in.readLong();
    dFlow = in.readLong();
    sumFlow = in.readLong();
  }

  public long getUpFlow() {
    return upFlow;
  }

  public void setUpFlow(long upFlow) {
    this.upFlow = upFlow;
  }

  public long getdFlow() {
    return dFlow;
  }

  public void setdFlow(long dFlow) {
    this.dFlow = dFlow;
  }

  public long getSumFlow() {
    return sumFlow;
  }

  public void setSumFlow(long sumFlow) {
    this.sumFlow = sumFlow;
  }

  @Override
  public String toString() {
    return "FlowBean{" +
        "upFlow=" + upFlow +
        ", dFlow=" + dFlow +
        ", sumFlow=" + sumFlow +
        '}';
  }
}

MapReduce程序:

package com.ice.hadoop.test.flowbean;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author:ice
 * @Date: 2019/2/22 0022
 */
public class FlowCount {

  static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String line = value.toString();
      String[] fields = line.split(" ");
      String phone = fields[0];
      Long upFlow = Long.parseLong(fields[1]);
      Long dFlow = Long.parseLong(fields[2]);
      context.write(new Text(phone), new FlowBean(upFlow, dFlow));
    }
  }

  static class FlowCountReduce extends Reducer<Text, FlowBean, Text, FlowBean> {

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)
        throws IOException, InterruptedException {

      long sumUpFlow = 0L;
      long sumDFlow = 0L;
      for (FlowBean bean : values) {
        sumUpFlow += bean.getUpFlow();
        sumDFlow += bean.getdFlow();
      }

      FlowBean sumBean = new FlowBean(sumUpFlow, sumDFlow);
      context.write(key, sumBean);
    }
  }

  public static void main(String[] args) throws Exception {
    //创建配置对象
    Configuration conf = new Configuration();
    //创建Job对象
    Job job = Job.getInstance(conf, "FlowCount");
    //设置mapper类
    job.setMapperClass(FlowCountMapper.class);
    //设置 Reduce类
    job.setReducerClass(FlowCountReduce.class);

    //设置运行job类
    job.setJarByClass(FlowCount.class);

    //设置map输出的key,value类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FlowBean.class);
    //设置reduce输出的key,value类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);

    //设置输入路径金额输出路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //提交job
    boolean b = job.waitForCompletion(true);

    System.exit(b ? 0 : 1);
  }

}

编译打包步骤是一样的。

三、合并小文件

为什么要合并小文件,因为mapReduce会将每一个小文件都当做一个任务,当特别多的小文件时,导致创建非常多的任务从而效率损耗

如何实现:文件的读取有map负责,为了将小文件合并,需要使用Inputformat,RecordReader,RecordReader负责实现一次读取一个完整文件封装为key value,map接收到文件内容,然后以文件名为key,以文件内容为value,向外输出的格式要注意,要使用SequenceFileOutPutFormat(用来输出对象)。

因为reduce收到的key value都是对象,不是普通的文本,reduce默认的输出格式是TextOutputFormat,使用它的话,最终输出的内容就是对象ID,所以要使用SequenceFileOutPutFormat进行输出

3.1 代码实践

package com.ice.hadoop.test.mergefile;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


public class MyInputFormat extends FileInputFormat<NullWritable, ByteWritable> {

  @Override
  public org.apache.hadoop.mapreduce.RecordReader<NullWritable, ByteWritable> createRecordReader(
      org.apache.hadoop.mapreduce.InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
      throws IOException, InterruptedException {
    MyRecordReader reader = new MyRecordReader();
    reader.initialize(inputSplit, taskAttemptContext);
    return null;
  }

  @Override
  protected boolean isSplitable(JobContext context, Path filename) {
    //设置每个小文件不可分割,保证一个小文件生成一个key-value键值对
    return false;
  }
}

createRecordReader方法中创建一个自定义的reader

package com.ice.hadoop.test.mergefile;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> {

  private FileSplit fileSplit;
  private Configuration conf;
  private BytesWritable value = new BytesWritable();
  private boolean processed = false;

  @Override
  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
      throws IOException, InterruptedException {
    this.fileSplit = (FileSplit) inputSplit;
    this.conf = context.getConfiguration();
  }

  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!processed) {
      byte[] contents = new byte[(int) fileSplit.getLength()];
      Path file = fileSplit.getPath();
      FileSystem fs = file.getFileSystem(conf);
      FSDataInputStream in = null;

      try {
        in = fs.open(file);
        IOUtils.readFully(in, contents, 0, contents.length);
        value.set(contents, 0, contents.length);
      } finally {
        IOUtils.closeStream(in);
      }
      processed = true;
      return true;
    }
    return false;
  }

  @Override
  public NullWritable getCurrentKey() throws IOException, InterruptedException {
    return NullWritable.get();
  }

  @Override
  public BytesWritable getCurrentValue() throws IOException, InterruptedException {
    return value;
  }

  @Override
  public float getProgress() throws IOException, InterruptedException {
    return processed ? 1.0f : 0.0f;
  }

  @Override
  public void close() throws IOException {

  }
}

其中有3个核心方法:nextKeyValue、getCurrentKey、getCurrentValue。

nextKeyValue负责生成要传递给map方法的key和value。getCurrentKey、getCurrentValue是实际获取key和value的。所以RecordReader的核心机制就是:通过nextKeyValue生成key value,然后通过getCurrentKey和getCurrentValue来返回上面构造好的key value。这里的nextKeyValue负责把整个文件内容作为value。

MapReduce程序:

package com.ice.hadoop.test.mergefile;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

/**
 * @author:ice
 * @Date: 2019/2/22 0022
 */
public class ManyToOne {

  static class FileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {

    private Text fileNameKey;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      InputSplit split = context.getInputSplit();
      Path path = ((FileSplit) split).getPath();
      fileNameKey = new Text(path.toString());
    }

    @Override
    protected void map(NullWritable key, BytesWritable value, Context context)
        throws IOException, InterruptedException {
      context.write(fileNameKey, value);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    job.setJarByClass(ManyToOne.class);

    job.setInputFormatClass(MyInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(BytesWritable.class);
    job.setMapperClass(FileMapper.class);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
  }

}
原文地址:https://www.cnblogs.com/skyice/p/10421419.html