8.3hadoop mapreduce 输出格式

1.1  输出格式

1.1.1         TextOutputFormat文本输出

默认输出格式,键值通toString()转为文本,Tab键分隔,属性mapreduce.putput. textoutputformat.separator属性设置分隔符。

1.1.2         二进制输出

(1)   SequenceFileOutputFormat

输出写为顺序文件,格式紧凑,容易压缩。

(2)   SequenceFileAsBinaryOutputFormat

用原始的二进制格式把键值对写入顺序文件。

(3)MapFileOutputFormat

把map文件作为输出,MapFile中键必须按顺序添加。reduce输入键一定是有序的,但是输出不一定,有reduce函数控制,所以MapFileOutputFormat需要额外限制输出有序。

1.1.3         多个文件输出

输出文件名自定义或者输出多个文件。例如将温度数据按照气象局输出文件,每个气象局一个文件。方法时一个气象局对应一个reduce,重写partitioner,同一个气象局的数据放到一个partition分区,reduce个数设置为气象局的个数。可能reduce数量过多,reduce作业数据量分配不均。应该用比较少的reduce做更多的事情,减少额外的开销。所以让集群自行决定分区数,优化性能。这样一个reduce就要按照多个气象局写入多个文件,因此使用MultipleOutput,创建多个name-m-nnnnn或者reduce输出文件name-r-nnnnn,nnnnn为块号。

实例:用MultipleOutput类将整个数据集分区到气象局id命名的文件中

public class PartitionByStationIdMultipleOutputs extends Configured implements Tool

{     //重写map函数

       static class StationMapper extends Mapper<LongWritable,Text,Text,Text>

       {

privare NcdcRecordParser parser=new NcdcRecordParser();

              @override

              protected void map(LongWriteable key,Text value,Context context)throws IOexception,InterruptedException

{

       parser.parse(value);

       context.write(new Text(parser.getStationId()),value);

}

}

reduce类

static class MultipleOutputsReducer extends Reduce<Text,Text,NullWritable,Text>

{

       private MultipleOutputs<NullWritable,Text> multipleOutputs;

       @Override

       protected void setup(Context context) throws IOException,InterruptedException

       {

              multipleoutputs=new MuletipleOuputs<NullWritable,Text>(context);

}

@override

public void reduce(Text key,Iterable<Text> values, Context context) throws IOException,InterruptedException{

       for(Text value:values)

       {

              //键,值,key气象局id作为文件名称

       multipleOutputs.write(NullWritable.get(),value,key.toString());

       }

}

@Override

protect void cleanup(Context context)throws IOException,InterruptedException{

       multipleOutputs.close();

}

@overide

protected int run(String[] args) throws Exception{

       Job job=JobBuilder.parIntputAndOutput(this,getConf(),args);

       if(job==null)

       {

              return -1;

}

job.setMapperClass(StationMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setReduceClass(MultipeOutputsReducer.class);

job.setOutputKeyClass(NullWritable.class);

return job.waitForCompletion(true)?0:1;

}

public static void main(String[] args) throw Exception

{

   int exitCode=ToolRunner.run(new PartitionByStationIdMultipleOutput, args);

System.exit(exitCode);

}

}

}

mapper中用parse解析出了StationId,输出键为StationId,输出值为气象数据。reduce中不用context输出,而是用MutipleOutput,输出键为NullWritable,输出值为气象数据,输出文件名为气象局的Id。

1.1.4         延迟输出

FileOutputFormat子类在数据为空时,也会创建空文件,所以可以使用LazyOutputFormat,有一条记录时才会创建文件。streaming 使用 –LazyOutput参数选项来开启LazyOutputFormat。

自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

https://www.cnblogs.com/bclshuai/p/11380657.html

原文地址:https://www.cnblogs.com/bclshuai/p/12285783.html