Hadoop(20)-MapReduce框架原理-OutputFormat

1.outputFormat接口实现类

2.自定义outputFormat

步骤:

1). 定义一个类继承FileOutputFormat

2). 定义一个类继承RecordWrite,重写write方法

3. 案例

有一个log文件,将包含nty的输出到nty.log文件,其他的输出到other.log

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.nty.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com

自定义类继承FileOutputFormat

package com.nty.outputFormat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 19:28
 */
public class FilterOutputFormat extends FileOutputFormat<LongWritable, Text> {


    @Override
    public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        FilterRecordWrite frw = new FilterRecordWrite();
        frw.init(job);
        return frw;
    }
}

自定义RecordWriter,重写write

package com.nty.outputFormat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 19:29
 */
public class FilterRecordWrite extends RecordWriter<LongWritable, Text> {

    private FSDataOutputStream nty;

    private FSDataOutputStream other;

    //将job通过参数传递过来
    public void init(TaskAttemptContext job) throws IOException {

        String outDir = job.getConfiguration().get(FileOutputFormat.OUTDIR);

        FileSystem fileSystem = FileSystem.get(job.getConfiguration());

        nty = fileSystem.create(new Path(outDir + "/nty.log"));
        other = fileSystem.create(new Path(outDir + "/other.log"));

    }

    @Override
    public void write(LongWritable key, Text value) throws IOException, InterruptedException {
        String address = value.toString() + "
";

        if(address.contains("nty")) {
            nty.write(address.getBytes());
        } else {
            other.write(address.getBytes());
        }

    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        //关流
        IOUtils.closeStream(nty);
        IOUtils.closeStream(other);
    }
}

Driver类设置

package com.nty.outputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 19:29
 */
public class FilterDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(FilterDriver.class);

        job.setOutputFormatClass(FilterOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("d:\Hadoop_test"));
        FileOutputFormat.setOutputPath(job, new Path("d:\Hadoop_test_out"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

输出结果

原文地址:https://www.cnblogs.com/duoduotouhenying/p/10110609.html