import org.apache.hadoop.hbase.{HTableDescriptor,HColumnDescriptor,HBaseConfiguration,TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory,Put,Get,Delete,Scan}
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.JavaConversions._
val conf=HBaseConfiguration.create()
//Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口
val conn=ConnectionFactory.createConnection(conf)
//从Connection获得 Admin 对象(相当于以前的 HAdmin)
val admin=conn.getAdmin
//本例将操作的表名
val userTable=TableName.valueOf(tablename)
if(admin.tableExists(userTable)){
println("Table exists!")
//admin.disableTable(userTable)
//admin.deleteTable(userTable)
//exit()
}else{
val tableDesc=new HTableDescriptor(userTable)
tableDesc.addFamily(new HColumnDescriptor(cf.getBytes))
admin.createTable(tableDesc)
println("Create table success!")
}
val table=conn.getTable(userTable)
//准备插入一条 key 为 id001 的数据
val p=new Put("id001".getBytes())
//为put操作指定 column 和 value (以前的 put.add 方法被弃用了)
p.addColumn(cf.getBytes,qulified.getBytes,"value1".getBytes)
table.put(p)
//查询某条数据
val g=new Get("id001".getBytes)
val result=table.get(g)
val value=Bytes.toString(result.getValue(cf.getBytes,qulified.getBytes))
println("GET id001:"+value)
//扫描数据
val s = new Scan()
s.addColumn(cf.getBytes,qulified.getBytes)
val scanner = table.getScanner(s)
try{
for(r <- scanner){
println(cf+":"+qulified+"="+Bytes.toString(r.getValue(cf.getBytes,qulified.getBytes)))
}
}finally {
//确保scanner关闭
scanner.close()
}
//删除某条数据,操作方式与 Put 类似
val d = new Delete("id001".getBytes)
d.addColumn(cf.getBytes,qulified.getBytes)
table.delete(d)
table.close()
conn.close()
name := "HTest"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies++= Seq(
"mysql" % "mysql-connector-java" % "5.1.38",
"org.apache.spark" %% "spark-core" % "1.5.2",
"org.apache.hbase" % "hbase-client" % "1.1.3",
"org.apache.hbase" % "hbase-common" % "1.1.3",
"org.apache.hbase" % "hbase-server" % "1.1.3"
)
resolvers+="OS China" at "http://maven.oschina.net/content/groups/public/"