hbase1

http://blog.csdn.net/anhuidelinger/article/details/16989771

import java.io.IOException;  
import java.util.HashMap;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.hbase.HBaseConfiguration;  
import org.apache.hadoop.hbase.client.Put;  
import org.apache.hadoop.hbase.client.Result;  
import org.apache.hadoop.hbase.client.Scan;  
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;  
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;  
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;  
import org.apache.hadoop.hbase.util.Bytes;  
import org.apache.hadoop.io.Writable;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.util.GenericOptionsParser;  

public class IndexBuilder {
	// 索引的列族为info ,列为name
	public static final byte[] column = Bytes.toBytes("info");
	public static final byte[] qualifier = Bytes.toBytes("name");
	
	
	public static class Map extends Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> {
		private byte[] family;
		// indexes 存储了列与表对对应关系,其中byte[]用于获取列的值作为索引表的键值,ImmutableBytesWritable作为表的名称
		private HashMap<byte[], ImmutableBytesWritable> indexes;
		// 在Map中,对每一行的数据提取出需要建立索引的列的值,加入到索引表中输出。
		protected void map(ImmutableBytesWritable rowKey, Result result,Context context) throws IOException, InterruptedException {
			// original: row 123 attribute:phone 555-1212
	        // index: row 555-1212 INDEX:ROW 123
			for (java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) {
				// 获得列名 qualifier
				byte[] qualifier = index.getKey();
				// 索引表名 table
				ImmutableBytesWritable table = index.getValue();
				// 插入的列值为 列名加上行名
				byte[] value = result.getValue(family, qualifier);
				// 以列值作为行键,在列“info:row”中插入行键
				Put put = new Put(value);
				put.add(column, qualifier, rowKey.get());
				// 在table表上执行put操作
				context.write(table, (Writable) put);
			}
		}


		protected void setup(Context context) throws IOException,InterruptedException {
			Configuration configuration = context.getConfiguration();
			String table = configuration.get("index.tablename");
			String[] fields = configuration.getStrings("index.fields");
			String familyName = configuration.get("index.familyname");
			family = Bytes.toBytes(familyName);

			// 初始化indexes   
			// if the table is "people" and the field to index is "email", then the
	        // index table will be called "people-email"
			indexes = new HashMap<byte[], ImmutableBytesWritable>();
			for (String field : fields) {
				indexes.put(Bytes.toBytes(field), new ImmutableBytesWritable(
						Bytes.toBytes(table + "-" + field)));
			}
		}
	}
	// Job configuration
	public static Job configureJob(Configuration conf, String[] args)throws IOException {
		String table = args[0];
		String columnFamily = args[1];
		System.out.println(table);
		// 通过Configuration.set()方法传递参数
		conf.set(TableInputFormat.SCAN, ScanToString(new Scan()));
		conf.set(TableInputFormat.INPUT_TABLE, table);
		conf.set("index.tablename", table);
		conf.set("index.familyname", columnFamily);
		String[] fields = new String[args.length - 2];
		for (int i = 0; i < fields.length; i++) {
			fields[i] = args[i + 2];
		}
		conf.setStrings("index.fields", fields);
		conf.set("index.familyname", "attributes");
		// 运行参数配置
		Job job = new Job(conf, table);
		job.setJarByClass(IndexBuilder.class);
		job.setMapperClass(Map.class);
		job.setNumReduceTasks(0);
		job.setInputFormatClass(TableInputFormat.class);
		job.setOutputFormatClass(MultiTableOutputFormat.class);
		return job;
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length < 3)System.exit(-1);
		Job job = configureJob(conf, otherArgs);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
	static String ScanToString(Scan scan) throws IOException {
	    ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
	    return Base64.encodeBytes(proto.toByteArray());
	}
}

  

原文地址:https://www.cnblogs.com/shenbingyu/p/5025968.html