hadoop拾遗(五)---- mapreduce 输出到多个文件 / 文件夹

今天要把HBase中的部分数据转移到HDFS上,想根据时间戳来自动输出到以时间戳来命名的每个文件夹下。虽然以前也做过相似工作,但有些细节还是忘记了,所以这次写个随笔记录一下。

package com.chuntent.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.chuntent.tool.HBaseTool;
import com.chuntent.tool.StringTool;
import com.chuntent.tool.bean.DataRecord;



public class CopyOfGetDataFromHBase4Phrase extends Configured implements Tool{
    
    /**
     * TableMapper<Text,IntWritable>  Text:输出的key类型,IntWritable:输出的value类型
     */
    public static class MyMapper extends TableMapper<Text,Text>{
        

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Context context)
                throws IOException, InterruptedException {           
            
        	DataRecord dr = new DataRecord(value); 
        	context.write(new Text(key.get()), new Text(dr.toString().replaceAll("
|
", "")));
        	
        }
    }

	public static class ReduceByStamp extends Reducer<Text, Text, Text, Text> {
		private MultipleOutputs<Text, Text> mos;
		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			mos = new MultipleOutputs<Text, Text>(context);
		}

		public void reduce(Text key, Iterable<Text> values,
				Context context) throws IOException, InterruptedException {
			for(Text text : values){
				mos.write(key, text, getName(key.toString()));				
				context.getCounter("data", "num").increment(1);
			}
		}
		public String getName(String stamp){
			//返回文件夹的名称 
			return StringTool.getDateFromRowKey(stamp) + "/"; 
		}
		@Override
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			// 流操作必须有,否则在数据量小的情况,数据全部停留在缓冲区中
			mos.close();
		}
	}
 
    @Override
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        
        Configuration conf = HBaseConfiguration.create(HBaseTool.getConfig());
        conf.set("mapred.reduce.slowstart.completed.maps", "0.99");
       
        Job job = new Job(conf,"Move Data");
        job.setJarByClass(CopyOfGetDataFromHBase4Phrase.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setReducerClass(ReduceByStamp.class);
        job.setNumReduceTasks(1);
        Scan scan = new Scan();  
        //指定Mapper读取的表为word
        TableMapReduceUtil.initTableMapperJob("news", scan, MyMapper.class, Text.class, Text.class, job);

        Path output = new Path(args[2]);
        FileOutputFormat.setOutputPath(job, output);
        FileSystem fs = FileSystem.get(getConf());
		if (fs.exists(output))
			fs.delete(output, true);
        job.waitForCompletion(true);
		return job.isSuccessful() ? 0 : 1;
    }
    public static void main(String [] args){
    	try{
	    	Configuration conf = new Configuration();
//	    	conf.set(name, value);
			String[] otheragrs = new GenericOptionsParser(conf, args)
					.getRemainingArgs();
			int result = ToolRunner.run(conf, new CopyOfGetDataFromHBase4Phrase(), otheragrs);
			System.exit(result);
    	}catch(Exception e){
    		e.printStackTrace();
    	}
    }
}

  

原文地址:https://www.cnblogs.com/nocml/p/3668456.html