hadoop 使用Avro排序

在上例中,使用Avro框架求出数据的最大值,本例使用Avro对数据排序,输入依然是之前的样本,输出使用文本(也可以输出Avro格式)。

1、在Avro的Schema中直接设置排序方向。

dataRecord.avsc,放入resources目录下:

{
"type":"record", "name":"WeatherRecord", "doc":"A weather reading", "fields":[ {"name":"year","type":"int"}, {"name":"temperature","type":"int","order":"descending"} ] }

原常量类:

public class AvroSchemas {
    private Schema currentSchema;

    //本例中不使用常量,修改成资源中加载
    public static final Schema SCHEMA = new Schema.Parser().parse("{
" +
            "	"type":"record",
" +
            "	"name":"WeatherRecord",
" +
            "	"doc":"A weather reading",
" +
            "	"fields":[
" +
            "		{"name":"year","type":"int"},
" +
            "		{"name":"temperature","type":"int","order":"descending"}
" +
            "	]	
" +
            "}");

    public AvroSchemas() throws IOException {
        Schema.Parser parser = new Schema.Parser();
        //采用从资源文件中读取Avro数据格式
        this.currentSchema = parser.parse(getClass().getResourceAsStream("dataRecord.avsc"));
    }


    public Schema getCurrentSchema() {
        return currentSchema;
    }
}

2、mapper

public class AvroMapper extends Mapper<LongWritable,Text,AvroKey<GenericRecord>,AvroValue<GenericRecord>> {
    private RecordParser parser = new RecordParser();
//    private GenericRecord record = new GenericData.Record(AvroSchemas.SCHEMA);
    private AvroSchemas schema;
    private GenericRecord record;

    public AvroMapper() throws IOException {
        schema =new AvroSchemas();
        record = new GenericData.Record(schema.getCurrentSchema());
    }


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        parser.parse(value.toString());
        if(parser.isValid()){
            record.put("year",parser.getYear());
            record.put("temperature",parser.getData());
            context.write(new AvroKey<>(record),new AvroValue<>(record));
        }
    }
}

3、reducer

public class AvroReducer extends Reducer<AvroKey<GenericRecord>,AvroValue<GenericRecord>,IntPair,NullWritable> {
    //多文件输出,本例中每年一个文件
    private MultipleOutputs<IntPair,NullWritable> multipleOutputs;

    /**
     * Called once at the start of the task.
     *
     * @param context
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        multipleOutputs = new MultipleOutputs<>(context);
    }


    @Override
    protected void reduce(AvroKey<GenericRecord> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {
        //在混洗阶段完成排序,reducer只需直接输出数据
        for (AvroValue<GenericRecord> value : values){
            GenericRecord record = value.datum();
            //多文件输出,每年一个文件。
            multipleOutputs.write(new IntPair((Integer) record.get("year"),(Integer)(record.get("temperature"))),NullWritable.get(),record.get("year").toString());
//            context.write(new IntPair((Integer) record.get("year"),(Integer)(record.get("temperature"))),NullWritable.get());
        }
    }

}

4、job

public class AvroSort extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.job.ubertask.enable","true");

        Job job = Job.getInstance(conf,"Avro sort");
        job.setJarByClass(AvroSort.class);

        //通过AvroJob直接设置Avro key和value的输入和输出,而不是使用Job来设置
        AvroJob.setMapOutputKeySchema(job, AvroSchemas.SCHEMA);
        AvroJob.setMapOutputValueSchema(job,AvroSchemas.SCHEMA);
//        AvroJob.setOutputKeySchema(job,AvroSchemas.SCHEMA);

        job.setMapperClass(AvroMapper.class);
        job.setReducerClass(AvroReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
//        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        Path outPath = new Path(args[1]);
        FileSystem fileSystem = outPath.getFileSystem(conf);
        //删除输出路径
        if(fileSystem.exists(outPath))
        {
            fileSystem.delete(outPath,true);
        }

        return job.waitForCompletion(true) ? 0:1;
    }

    public static void main(String[] args) throws Exception{
        int exitCode = ToolRunner.run(new AvroSort(),args);
        System.exit(exitCode);
    }
}
原文地址:https://www.cnblogs.com/asker009/p/10436983.html