Hbase Client 使用示例

这两天在读淘宝开源出来的DataX,想模仿它写一个离线数据交换组件。读了它读写Hbase的插件的源代码,觉得写得确实比我之前写得好。整理出来,放在这里,向优秀代码学习。关键的地方时它在处理异常的时候考虑的比我周全很多。

先是写Hbase的代码:

/**

 * (C) 2010-2011 Alibaba Group Holding Limited.

 *

 * This program is free software; you can redistribute it and/or

 * modify it under the terms of the GNU General Public License 

 * version 2 as published by the Free Software Foundation. 

 * 

 */



package com.taobao.datax.plugins.writer.hbasewriter;



import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.client.*;

import org.apache.hadoop.hbase.util.Bytes;



import java.io.IOException;

import java.util.ArrayList;

import java.util.List;





public class HBaseProxy {

	private Configuration config;



	private HTable htable;



	private HBaseAdmin admin;



	private HTableDescriptor descriptor;

	

	private static final int BUFFER_LINE = 1024;

	

	private List<Put> buffer = new ArrayList<Put>(BUFFER_LINE);



	private Put p;



	public static HBaseProxy newProxy(String hbase_conf, String table) 

			throws IOException {

		return new HBaseProxy(hbase_conf, table);

	}

	

	private HBaseProxy(String hbase_conf, String tableName)

			throws IOException {

		Configuration conf = new Configuration();

		conf.addResource(new Path(hbase_conf));

		config = new Configuration(conf);

		htable = new HTable(config, tableName);

		admin = new HBaseAdmin(config);

		descriptor = htable.getTableDescriptor();

	}



	public void setBufferSize(int bufferSize) throws IOException {

		this.htable.setWriteBufferSize(bufferSize);

	}

	

	public void setHTable(String tableName) throws IOException {

		this.htable = new HTable(config, tableName);

	}



	public void setAutoFlush(boolean autoflush) {

		this.htable.setAutoFlush(autoflush);

	}



	public boolean check() throws IOException {

		if (!admin.isMasterRunning()) {

			throw new IllegalStateException("hbase master is not running!");

		}

		if (!admin.tableExists(htable.getTableName())) {

			throw new IllegalStateException("hbase table " + Bytes.toString(htable.getTableName())

					+ " is not existed!");

		}

		if (!admin.isTableAvailable(htable.getTableName())) {

			throw new IllegalStateException("hbase table " + Bytes.toString(htable.getTableName())

					+ " is not available!");

		}

		if (!admin.isTableEnabled(htable.getTableName())) {

			throw new IllegalStateException("hbase table " + Bytes.toString(htable.getTableName())

					+ " is disable!");

		}



		return true;

	}



	public void close() throws IOException {

		htable.close();

	}



	public void deleteTable() throws IOException {

		Scan s = new Scan();

		ResultScanner scanner = htable.getScanner(s);

		for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {

			htable.delete(new Delete(rr.getRow()));

		}

		scanner.close();

	}



	public void truncateTable() throws IOException {

		admin.disableTable(htable.getTableName());

		admin.deleteTable(htable.getTableName());

		admin.createTable(descriptor);

	}



	public void flush() throws IOException {

		if (!buffer.isEmpty()) {

			htable.put(buffer);

			buffer.clear();

		}

		htable.flushCommits();

	}



	public void prepare(byte[] rowKey) {

		this.p = new Put(rowKey);

    }

	

	public Put add(byte[] family, byte[] qualifier, byte[] value) {

		return this.p.add(family, qualifier, value);

	}

	

	public void insert() throws IOException {

		buffer.add(this.p);

		if (buffer.size() >= BUFFER_LINE) {

			htable.put(buffer);

			buffer.clear();

		}

	}

}

  

原文地址:https://www.cnblogs.com/qgxiaoguang/p/2921045.html