CombineTextInputFormat 案例

一、核心代码(依托于自定义的WordCount)

1、位置

在设置输入和输出路径前

2、代码

// 设置
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 134217728);

注意:size的值为:1024 * 1024 * nM

3、体现

number of splits:1

二、示例

1、前提条件

创建Maven项目,导入依赖,配置日志

2、Mapper

package com.wt;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text k = new Text();
    IntWritable v = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1.获取第一行
        String line = value.toString();
        // 2.切割
        String[] words = line.split("\s+");
        // 3.输出
        for (String word : words) {
            /*
            * Text k = new Text(); 每个 key 执行一次 map 因此,把 这个放在外面,减少内存消耗
            * new IntWritable(1); 同上
            * */
            k.set(word);
            context.write(k, v);
        }
    }
}
Mapper类

3、Reducer

package com.wt;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    IntWritable v = new IntWritable(); // 省内存
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 1. 累加求和
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        // 2. 输出
        v.set(sum);
        context.write(key, v);
    }
}
Reducer类

4、Driver

package com.wt;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    args = new String[]{"E:\a\input", "E:\a\output1"};
    //1、获取配置信息已经封装任务
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    //2、设置jar加载路径
    job.setJarByClass(WordCountDriver.class);
    //3、设置map和reduce类
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);
    //4、设置map输出的k, v
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    //5、设置最终输出kv类型
    job.setOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    // 注意: 大小为 4M * 1024 * 1024
    job.setInputFormatClass(CombineTextInputFormat.class);
    CombineTextInputFormat.setMaxInputSplitSize(job, 134217728);
    //6、设置输入和输出路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //7、提交
    boolean wait = job.waitForCompletion(true);
    System.exit(wait ? 0 : 1);
}
}

注意

没使用CombineTextInputFormat,默认是使用TextInputFormat

原文地址:https://www.cnblogs.com/wt7018/p/13610934.html