这里只介绍三种最常用的方式
1.HBase shell
HBase的命令行工具是最简单的接口,主要用于HBase管理
首先启动HBase
帮助
hbase(main):001:0> help
查看HBase服务器状态
hbase(main):001:0> status
查询HBse版本
hbase(main):002:0> version
ddl操作
1.创建一个member表
hbase(main):013:0> create 'table1','tab1_id','tab1_add','tab1_info'
2.查看所有的表
hbase(main):006:0> list
3.查看表结构
hbase(main):007:0> describe 'member'
4.删除一个列簇
5、查看表是否存在
6、判断表是否为"enable"
7、删除一个表
dml操作
1、创建member表
删除一个列簇(一般不超过两个列簇)
2、往member表插入数据
3、扫描查看数据
4、获取数据
获取一个rowkey的所有数据
获取一个rowkey,一个列簇的所有数据
获取一个rowkey,一个列簇中一个列的所有数据
5、更新数据
6、删除列簇中其中一列
7、统计表中总的行数
8、清空表中数据
2.java API
最常规且最高效的访问方式
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.hbase.HBaseConfiguration; 5 import org.apache.hadoop.hbase.HColumnDescriptor; 6 import org.apache.hadoop.hbase.HTableDescriptor; 7 import org.apache.hadoop.hbase.KeyValue; 8 import org.apache.hadoop.hbase.MasterNotRunningException; 9 import org.apache.hadoop.hbase.ZooKeeperConnectionException; 10 import org.apache.hadoop.hbase.client.Delete; 11 import org.apache.hadoop.hbase.client.Get; 12 import org.apache.hadoop.hbase.client.HBaseAdmin; 13 import org.apache.hadoop.hbase.client.HTable; 14 import org.apache.hadoop.hbase.client.Put; 15 import org.apache.hadoop.hbase.client.Result; 16 import org.apache.hadoop.hbase.client.ResultScanner; 17 import org.apache.hadoop.hbase.client.Scan; 18 import org.apache.hadoop.hbase.util.Bytes; 19 20 public class HbaseTest { 21 public static Configuration conf; 22 static{ 23 conf = HBaseConfiguration.create();//第一步 24 conf.set("hbase.zookeeper.quorum", "header-2,core-1,core-2"); 25 conf.set("hbase.zookeeper.property.clientPort", "2181"); 26 conf.set("hbase.master", "header-1:60000"); 27 } 28 29 public static void main(String[] args) throws IOException{ 30 //createTable("member"); 31 //insertDataByPut("member"); 32 //QueryByGet("member"); 33 QueryByScan("member"); 34 //DeleteData("member"); 35 } 36 37 38 39 /** 40 * 创建表 通过HBaseAdmin对象操作 41 * 42 * @param tablename 43 * @throws IOException 44 * @throws ZooKeeperConnectionException 45 * @throws MasterNotRunningException 46 * 47 */ 48 public static void createTable(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { 49 //创建HBaseAdmin对象 50 HBaseAdmin hBaseAdmin = new HBaseAdmin(conf); 51 //判断表是否存在,若存在就删除 52 if(hBaseAdmin.tableExists(tableName)){ 53 hBaseAdmin.disableTable(tableName); 54 hBaseAdmin.deleteTable(tableName); 55 } 56 HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); 57 //添加Family 58 tableDescriptor.addFamily(new HColumnDescriptor("info")); 59 tableDescriptor.addFamily(new HColumnDescriptor("address")); 60 //创建表 61 hBaseAdmin.createTable(tableDescriptor); 62 //释放资源 63 hBaseAdmin.close(); 64 } 65 66 /** 67 * 68 * @param tableName 69 * @throws IOException 70 */ 71 @SuppressWarnings("deprecation") 72 public static void insertDataByPut(String tableName) throws IOException { 73 //第二步 获取句柄,传入静态配置和表名称 74 HTable table = new HTable(conf, tableName); 75 76 //添加rowkey,添加数据, 通过getBytes方法将string类型都转化为字节流 77 Put put1 = new Put(getBytes("djt")); 78 put1.add(getBytes("address"), getBytes("country"), getBytes("china")); 79 put1.add(getBytes("address"), getBytes("province"), getBytes("beijing")); 80 put1.add(getBytes("address"), getBytes("city"), getBytes("beijing")); 81 82 put1.add(getBytes("info"), getBytes("age"), getBytes("28")); 83 put1.add(getBytes("info"), getBytes("birthdy"), getBytes("1998-12-23")); 84 put1.add(getBytes("info"), getBytes("company"), getBytes("dajiang")); 85 86 //第三步 87 table.put(put1); 88 89 //释放资源 90 table.close(); 91 } 92 93 /** 94 * 查询一条记录 95 * @param tableName 96 * @throws IOException 97 */ 98 public static void QueryByGet(String tableName) throws IOException { 99 //第二步 100 HTable table = new HTable(conf, tableName); 101 //根据rowkey查询 102 Get get = new Get(getBytes("djt")); 103 Result r = table.get(get); 104 System.out.println("获得到rowkey:" + new String(r.getRow())); 105 for(KeyValue keyvalue : r.raw()){ 106 System.out.println("列簇:" + new String(keyvalue.getFamily()) 107 + "====列:" + new String(keyvalue.getQualifier()) 108 + "====值:" + new String(keyvalue.getValue())); 109 } 110 table.close(); 111 } 112 113 114 115 /** 116 * 扫描 117 * @param tableName 118 * @throws IOException 119 */ 120 public static void QueryByScan(String tableName) throws IOException { 121 // 第二步 122 HTable table = new HTable(conf, tableName); 123 Scan scan = new Scan(); 124 //指定需要扫描的列簇,列.如果不指定就是全表扫描 125 scan.addColumn(getBytes("info"), getBytes("company")); 126 ResultScanner scanner = table.getScanner(scan); 127 for(Result r : scanner){ 128 System.out.println("获得到rowkey:" + new String(r.getRow())); 129 for(KeyValue kv : r.raw()){ 130 System.out.println("列簇:" + new String(kv.getFamily()) 131 + "====列:" + new String(kv.getQualifier()) 132 + "====值 :" + new String(kv.getValue())); 133 } 134 } 135 //释放资源 136 scanner.close(); 137 table.close(); 138 } 139 140 141 142 /** 143 * 删除一条数据 144 * @param tableName 145 * @throws IOException 146 */ 147 public static void DeleteData(String tableName) throws IOException { 148 // 第二步 149 HTable table = new HTable(conf, tableName); 150 151 Delete delete = new Delete(getBytes("djt")); 152 delete.addColumn(getBytes("info"), getBytes("age")); 153 154 table.delete(delete); 155 //释放资源 156 table.close(); 157 } 158 159 /** 160 * 转换byte数组(string类型都转化为字节流) 161 */ 162 public static byte[] getBytes(String str){ 163 if(str == null) 164 str = ""; 165 return Bytes.toBytes(str); 166 } 167 }
3.MapReduce
直接使用MapReduce作业处理HBase数据
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; /** * 将hdfs里面的数据导入hbase * @author Administrator * */ public class MapReduceWriteHbaseDriver { public static class WordCountMapperHbase extends Mapper<Object, Text, ImmutableBytesWritable, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); //输出到hbase的key类型为ImmutableBytesWritable context.write(new ImmutableBytesWritable(Bytes.toBytes(word.toString())), one); } } } public static class WordCountReducerHbase extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable>{ private IntWritable result = new IntWritable(); public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int sum = 0; for(IntWritable val : values){ sum += val.get(); } //put实例化 key代表主键,每个单词存一行 Put put = new Put(key.get()); //三个参数分别为:列簇content 列count 列值为词频 put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum))); context.write(key, put); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ String tableName = "wordcount";//hbase数据库表名 也可以通过命令行传入表名args Configuration conf = HBaseConfiguration.create();//实例化Configuration conf.set("hbase.zookeeper.quorum", "header-2,core-1,core-2"); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.master", "header-1"); //如果表已经存在就先删除 HBaseAdmin admin = new HBaseAdmin(conf); if(admin.tableExists(tableName)){ admin.disableTable(tableName); admin.deleteTable(tableName); } HTableDescriptor htd = new HTableDescriptor(tableName);//指定表名 HColumnDescriptor hcd = new HColumnDescriptor("content");//指定列簇名 htd.addFamily(hcd);//创建列簇 admin.createTable(htd);//创建表 Job job = new Job(conf, "import from hdfs to hbase"); job.setJarByClass(MapReduceWriteHbaseDriver.class); job.setMapperClass(WordCountMapperHbase.class); //设置插入hbase时的相关操作 TableMapReduceUtil.initTableReducerJob(tableName, WordCountReducerHbase.class, job, null, null, null, null, false); //map输出 job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(IntWritable.class); //reduce输出 job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); //读取数据 FileInputFormat.addInputPaths(job, "hdfs://header-1:9000/user/test.txt"); System.out.println(job.waitForCompletion(true) ? 0 : 1); } }
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 读取hbse数据存入HDFS * @author Administrator * */ public class MapReduceReaderHbaseDriver { public static class WordCountHBaseMapper extends TableMapper<Text, Text>{ protected void map(ImmutableBytesWritable key, Result values,Context context) throws IOException, InterruptedException{ StringBuffer sb = new StringBuffer(""); //获取列簇content下面的值 for(java.util.Map.Entry<byte[], byte[]> value : values.getFamilyMap("content".getBytes()).entrySet()){ String str = new String(value.getValue()); if(str != null){ sb.append(str); } context.write(new Text(key.get()), new Text(new String(sb))); } } } public static class WordCountHBaseReducer extends Reducer<Text, Text, Text, Text>{ private Text result = new Text(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ for(Text val : values){ result.set(val); context.write(key, result); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String tableName = "wordcount";//表名称 Configuration conf = HBaseConfiguration.create();//实例化Configuration conf.set("hbase.zookeeper.quorum", "header-2,core-1,core-2"); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.master", "header-1:60000"); Job job = new Job(conf, "import from hbase to hdfs"); job.setJarByClass(MapReduceReaderHbaseDriver.class); job.setReducerClass(WordCountHBaseReducer.class); //配置读取hbase的相关操作 TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), WordCountHBaseMapper.class, Text.class, Text.class, job, false); //输出路径 FileOutputFormat.setOutputPath(job, new Path("hdfs://header-1:9000/user/out")); System.out.println(job.waitForCompletion(true) ? 0 : 1); } }