Hbase Java Api操作

注意:pom.xml文件添加(hbase版本和服务器上的版本可以不一致,但尽量保证一致)

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.4</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>

1、工具类HBaseHelper.java

  1 package com.vue.demo.utils;
  2 
  3 
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.hbase.Cell;
  6 import org.apache.hadoop.hbase.CompareOperator;
  7 import org.apache.hadoop.hbase.NamespaceDescriptor;
  8 import org.apache.hadoop.hbase.TableName;
  9 import org.apache.hadoop.hbase.client.*;
 10 import org.apache.hadoop.hbase.filter.*;
 11 import org.apache.hadoop.hbase.util.Bytes;
 12 import org.apache.hadoop.hbase.client.Result;
 13 import org.slf4j.Logger;
 14 import org.slf4j.LoggerFactory;
 15 
 16 import java.io.Closeable;
 17 import java.io.IOException;
 18 import java.util.ArrayList;
 19 import java.util.HashMap;
 20 import java.util.List;
 21 import java.util.Map;
 22 
 23 /**
 24  * @author yangwj
 25  * @date 2020/9/7 10:09
 26  * @desc
 27  */
 28 public class HBaseHelper implements Closeable {
 29 
 30     private static final Logger log = LoggerFactory.getLogger(HBaseHelper.class);
 31 
 32     private Configuration configuration = null;
 33     private Connection connection = null;
 34     private Admin admin = null;
 35 
 36     /**构造方法*/
 37     private HBaseHelper(Configuration configuration) throws IOException {
 38         this.configuration = configuration;
 39         this.connection = ConnectionFactory.createConnection(this.configuration);
 40         admin = this.connection.getAdmin();
 41     }
 42 
 43     /**获取对象*/
 44     public static HBaseHelper getHBaseHelper(Configuration configuration) throws IOException {
 45         return new HBaseHelper(configuration);
 46     }
 47 
 48     @Override
 49     public void close() throws IOException {
 50         admin.close();
 51         connection.close();
 52     }
 53 
 54     public Connection getConnection() {
 55         return connection;
 56     }
 57 
 58     public Configuration getConfiguration() {
 59         return configuration;
 60     }
 61 
 62     /**创建命名空间:数据隔离*/
 63     public void createNamespace(String namespace) {
 64         try {
 65             NamespaceDescriptor nd = NamespaceDescriptor.create(namespace).build();
 66             admin.createNamespace(nd);
 67         } catch (Exception e) {
 68             log.error("[createNamespace]Error: {}",e.getMessage());
 69         }
 70     }
 71 
 72     /**删除命名空间*/
 73     public void dropNamespace(String namespace, boolean force) {
 74         try {
 75             if (force) {
 76                 TableName[] tableNames = admin.listTableNamesByNamespace(namespace);
 77                 for (TableName name : tableNames) {
 78                     admin.disableTable(name);
 79                     admin.deleteTable(name);
 80                 }
 81             }
 82         } catch (Exception e) {
 83             log.error("[dropNamespace]Error: {}",e.getMessage());
 84         }
 85         try {
 86             admin.deleteNamespace(namespace);
 87         } catch (IOException e) {
 88             log.error("[dropNamespace]Error: {}",e.getMessage());
 89         }
 90     }
 91 
 92     /**判断是否存在表*/
 93     public boolean existsTable(String table)
 94             throws IOException {
 95         return existsTable(TableName.valueOf(table));
 96     }
 97 
 98     public boolean existsTable(TableName table)
 99             throws IOException {
100         return admin.tableExists(table);
101     }
102 
103     /**
104      * 创建表
105      * @param table 表名
106      * @param colfams 列簇
107      * @throws IOException
108      */
109     public void createTable(String table, String... colfams)
110             throws IOException {
111         createTable(TableName.valueOf(table), 1, null, colfams);
112     }
113 
114     public void createTable(TableName table, String... colfams)
115             throws IOException {
116         createTable(table, 1, null, colfams);
117     }
118 
119     public void createTable(String table, int maxVersions, String... colfams)
120             throws IOException {
121         createTable(TableName.valueOf(table), maxVersions, null, colfams);
122     }
123 
124     /**
125      * 创建表
126      * @param table 表名
127      * @param maxVersions timestamp
128      * @param colfams 列簇
129      * @throws IOException
130      */
131     public void createTable(TableName table, int maxVersions, String... colfams)
132             throws IOException {
133         createTable(table, maxVersions, null, colfams);
134     }
135 
136 
137     public void createTable(String table, byte[][] splitKeys, String... colfams)
138             throws IOException {
139         createTable(TableName.valueOf(table), 1, splitKeys, colfams);
140     }
141 
142     /**
143      * 创建表
144      * @param table 表名
145      * @param maxVersions timestamp
146      * @param splitKeys
147      * @param colfams 列簇
148      * @throws IOException
149      */
150     public void createTable(TableName table, int maxVersions, byte[][] splitKeys,
151                             String... colfams)
152             throws IOException {
153         //表描述器构造器
154         TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(table);
155 
156         //列族描述构造器
157         ColumnFamilyDescriptorBuilder cfDescBuilder;
158 
159         //列描述器
160         ColumnFamilyDescriptor cfDesc;
161 
162 
163         for (String cf : colfams) {
164             cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));
165             cfDescBuilder.setMaxVersions(maxVersions);
166             cfDesc = cfDescBuilder.build();
167             tableDescriptorBuilder.setColumnFamily(cfDesc);
168         }
169         //获得表描述器
170         TableDescriptor tableDescriptor = tableDescriptorBuilder.build();
171 
172         if (splitKeys != null) {
173             admin.createTable(tableDescriptor, splitKeys);
174         } else {
175             admin.createTable(tableDescriptor);
176         }
177     }
178 
179     /**禁用表*/
180     public void disableTable(String table) throws IOException {
181         disableTable(TableName.valueOf(table));
182     }
183 
184     public void disableTable(TableName table) throws IOException {
185         admin.disableTable(table);
186     }
187 
188     /**删除表*/
189     public void dropTable(String table) throws IOException {
190         dropTable(TableName.valueOf(table));
191     }
192 
193     public void dropTable(TableName table) throws IOException {
194         if (existsTable(table)) {
195             if (admin.isTableEnabled(table)) {disableTable(table);}
196             admin.deleteTable(table);
197         }
198     }
199 
200 
201     public void put(String table, String row, String fam, String qual,
202                     String val) throws IOException {
203         put(TableName.valueOf(table), row, fam, qual, val);
204     }
205 
206     /**插入或更新单行*/
207     public void put(TableName table, String row, String fam, String qual,
208                     String val) throws IOException {
209         Table tbl = connection.getTable(table);
210         Put put = new Put(Bytes.toBytes(row));
211         put.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual), Bytes.toBytes(val));
212         tbl.put(put);
213         tbl.close();
214     }
215 
216     public void put(String table, String row, String fam, String qual, long ts,
217                     String val) throws IOException {
218         put(TableName.valueOf(table), row, fam, qual, ts, val);
219     }
220 
221     /**带时间戳插入或更新单行*/
222     public void put(TableName table, String row, String fam, String qual, long ts,
223                     String val) throws IOException {
224         Table tbl = connection.getTable(table);
225         Put put = new Put(Bytes.toBytes(row));
226         put.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual), ts,
227                 Bytes.toBytes(val));
228         tbl.put(put);
229         tbl.close();
230     }
231 
232     /**插入或者更新一个rowKey数据,一个Put里有一个rowKey,可能有多个列族和列名*/
233     public void put(String tableNameString, Put put) throws IOException {
234         TableName tableName = TableName.valueOf(tableNameString);
235         Table table = connection.getTable(tableName);
236         if (put != null && put.size() > 0) {
237             table.put(put);
238         }
239         table.close();
240     }
241 
242     public void put(String table, String[] rows, String[] fams, String[] quals,
243                     long[] ts, String[] vals) throws IOException {
244         put(TableName.valueOf(table), rows, fams, quals, ts, vals);
245     }
246 
247     /**用于测试数据*/
248     public void put(TableName table, String[] rows, String[] fams, String[] quals,
249                     long[] ts, String[] vals) throws IOException {
250         Table tbl = connection.getTable(table);
251         for (String row : rows) {
252             Put put = new Put(Bytes.toBytes(row));
253             for (String fam : fams) {
254                 int v = 0;
255                 for (String qual : quals) {
256                     String val = vals[v < vals.length ? v : vals.length - 1];
257                     long t = ts[v < ts.length ? v : ts.length - 1];
258                     System.out.println("Adding: " + row + " " + fam + " " + qual +
259                             " " + t + " " + val);
260                     put.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual), t,
261                             Bytes.toBytes(val));
262                     v++;
263                 }
264             }
265             tbl.put(put);
266         }
267         tbl.close();
268     }
269 
270     public void dump(String table, String[] rows, String[] fams, String[] quals)
271             throws IOException {
272         dump(TableName.valueOf(table), rows, fams, quals);
273     }
274 
275     /**测试dump数据*/
276     public void dump(TableName table, String[] rows, String[] fams, String[] quals)
277             throws IOException {
278         Table tbl = connection.getTable(table);
279         List<Get> gets = new ArrayList<Get>();
280         for (String row : rows) {
281             Get get = new Get(Bytes.toBytes(row));
282             get.readAllVersions();
283             if (fams != null) {
284                 for (String fam : fams) {
285                     for (String qual : quals) {
286                         get.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual));
287                     }
288                 }
289             }
290             gets.add(get);
291         }
292         org.apache.hadoop.hbase.client.Result[] results = tbl.get(gets);
293         for (org.apache.hadoop.hbase.client.Result result : results) {
294             for (Cell cell : result.rawCells()) {
295                 System.out.println("Cell: " + cell +
296                         ", Value: " + Bytes.toString(cell.getValueArray(),
297                         cell.getValueOffset(), cell.getValueLength()));
298             }
299         }
300         tbl.close();
301     }
302 
303 
304     public void dump(String table) throws IOException {
305         dump(TableName.valueOf(table));
306     }
307 
308     public void dump(TableName table) throws IOException {
309         try (
310                 Table t = connection.getTable(table);
311                 ResultScanner scanner = t.getScanner(new Scan());
312         ) {
313             for (org.apache.hadoop.hbase.client.Result result : scanner) {
314                 dumpResult(result);
315             }
316         }
317     }
318 
319     /**从Cell取Array要加上位移和长度,不然数据不正确*/
320     public void dumpResult(Result result) {
321         for (Cell cell : result.rawCells()) {
322             System.out.println("Cell: " + cell +
323                     ", Value: " + Bytes.toString(cell.getValueArray(),
324                     cell.getValueOffset(), cell.getValueLength()));
325         }
326     }
327 
328     public void dumpCells(String key, List<Cell> list) {
329         for (Cell cell : list) {
330             String columnFamily = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
331             String columnName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
332             String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
333             System.out.printf("[key:%s]	[family:%s] [column:%s] [value:%s]
",
334                     key, columnFamily, columnName, value);
335         }
336     }
337 
338 
339     /**批量插入数据,list里每个map就是一条数据,并且按照rowKey columnFamily columnName columnValue放入map的key和value*/
340     public void bulkInsert(String tableNameString, List<Map<String, Object>> list) throws IOException {
341         Table table = connection.getTable(TableName.valueOf(tableNameString));
342         List<Put> puts = new ArrayList<Put>();
343         if (list != null && list.size() > 0) {
344             for (Map<String, Object> map : list) {
345                 Put put = new Put(Bytes.toBytes(map.get("rowKey").toString()));
346                 put.addColumn(Bytes.toBytes(map.get("columnFamily").toString()),
347                         Bytes.toBytes(map.get("columnName").toString()),
348                         Bytes.toBytes(map.get("columnValue").toString()));
349                 puts.add(put);
350             }
351         }
352         table.put(puts);
353         table.close();
354     }
355 
356     /**批量插入*/
357     public void bulkInsert2(String tableNameString, List<Put> puts) throws IOException {
358         Table table = connection.getTable(TableName.valueOf(tableNameString));
359         if (puts != null && puts.size() > 0) {
360             table.put(puts);
361         }
362         table.close();
363     }
364 
365     /**根据rowKey删除所有行数据*/
366     public void deleteByKey(String tableNameString, String rowKey) throws IOException {
367         Table table = connection.getTable(TableName.valueOf(tableNameString));
368         Delete delete = new Delete(Bytes.toBytes(rowKey));
369 
370         table.delete(delete);
371         table.close();
372     }
373 
374     /**根据rowKey和列族删除所有行数据*/
375     public void deleteByKeyAndFamily(String tableNameString, String rowKey, String columnFamily) throws IOException {
376         Table table = connection.getTable(TableName.valueOf(tableNameString));
377         Delete delete = new Delete(Bytes.toBytes(rowKey));
378         delete.addFamily(Bytes.toBytes(columnFamily));
379 
380         table.delete(delete);
381         table.close();
382     }
383 
384     /**根据rowKey、列族删除多个列的数据*/
385     public void deleteByKeyAndFC(String tableNameString, String rowKey,
386                                  String columnFamily, List<String> columnNames) throws IOException {
387         Table table = connection.getTable(TableName.valueOf(tableNameString));
388         Delete delete = new Delete(Bytes.toBytes(rowKey));
389         for (String columnName : columnNames) {
390             delete.addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
391         }
392         table.delete(delete);
393         table.close();
394     }
395 
396 
397     /**根据rowkey,获取所有列族和列数据*/
398     public List<Cell> getRowByKey(String tableNameString, String rowKey) throws IOException {
399         Table table = connection.getTable(TableName.valueOf(tableNameString));
400 
401         Get get = new Get(Bytes.toBytes(rowKey));
402 
403         org.apache.hadoop.hbase.client.Result result = table.get(get);
404 
405 //        Cell[] cells = result.rawCells();
406         List<Cell> list = result.listCells();
407         table.close();
408         return list;
409     }
410 
411     /**根据rowKey,family,qualifier获取列值*/
412     public List<Cell> getRowByKeyAndColumn(String tableNameString, String rowKey, String cf, String clName) throws IOException {
413         Table table = connection.getTable(TableName.valueOf(tableNameString));
414         Get get = new Get(Bytes.toBytes(rowKey));
415         get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(clName));
416 
417         org.apache.hadoop.hbase.client.Result result = table.get(get);
418         List<Cell> list = result.listCells();
419         table.close();
420         return list;
421     }
422 
423     /**根据rowkey,获取所有列族和列数据*/
424     public Map<String, List<Cell>> getRowByKeys(String tableNameString, String... rowKeys) throws IOException {
425         Table table = connection.getTable(TableName.valueOf(tableNameString));
426 
427         List<Get> gets = new ArrayList<>();
428         for (String rowKey : rowKeys) {
429             Get get = new Get(Bytes.toBytes(rowKey));
430             gets.add(get);
431         }
432 
433         org.apache.hadoop.hbase.client.Result[] results = table.get(gets);
434 
435         Map<String, List<Cell>> map = new HashMap<>();
436         for (org.apache.hadoop.hbase.client.Result res : results) {
437             map.put(Bytes.toString(res.getRow()), res.listCells());
438         }
439 
440         table.close();
441         return map;
442     }
443 
444     private Map<String, List<Cell>> formatToMap(String tableNameString,Scan scan) throws IOException{
445         //确保table和scanner被释放
446         try (Table table = connection.getTable(TableName.valueOf(tableNameString));
447              ResultScanner scanner = table.getScanner(scan);
448         ) {
449             Map<String, List<Cell>> map = new HashMap<>();
450             for (org.apache.hadoop.hbase.client.Result result : scanner) {
451                 map.put(Bytes.toString(result.getRow()), result.listCells());
452             }
453             return map;
454         }
455     }
456 
457     /**根据rowKey过滤数据,rowKey可以使用正则表达式
458      * 返回rowKey和Cells的键值对
459     **/
460     public Map<String, List<Cell>> filterByRowKeyRegex(String tableNameString, String rowKey, CompareOperator operator) throws IOException {
461 
462         Scan scan = new Scan();
463         //使用正则
464         RowFilter filter = new RowFilter(operator, new RegexStringComparator(rowKey));
465 
466         //包含子串匹配,不区分大小写。
467 //        RowFilter filter = new RowFilter(operator,new SubstringComparator(rowKey));
468 
469         scan.setFilter(filter);
470 
471         return formatToMap(tableNameString,scan);
472     }
473 
474     //根据列族,列名,列值(支持正则)查找数据
475     //返回值:如果查询到值,会返回所有匹配的rowKey下的各列族、列名的所有数据(即使查询的时候这些列族和列名并不匹配)
476     public Map<String, List<Cell>> filterByValueRegex(String tableNameString, String family, String colName,
477                                                       String value, CompareOperator operator) throws IOException {
478         Scan scan = new Scan();
479 
480         //正则匹配
481         SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(family),
482                 Bytes.toBytes(colName), operator, new RegexStringComparator(value));
483 
484         //完全匹配
485 //        SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(family),
486 //                Bytes.toBytes(colName),operator,Bytes.toBytes(value));
487 
488         //SingleColumnValueExcludeFilter排除列值
489 
490         //要过滤的列必须存在,如果不存在,那么这些列不存在的数据也会返回。如果不想让这些数据返回,设置setFilterIfMissing为true
491         filter.setFilterIfMissing(true);
492         scan.setFilter(filter);
493 
494         return formatToMap(tableNameString,scan);
495     }
496 
497     //根据列名前缀过滤数据
498     public Map<String, List<Cell>> filterByColumnPrefix(String tableNameString, String prefix) throws IOException {
499 
500         //列名前缀匹配
501         ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes(prefix));
502 
503         //QualifierFilter 用于列名多样性匹配过滤
504 //        QualifierFilter filter = new QualifierFilter(CompareOperator.EQUAL,new SubstringComparator(prefix));
505 
506         //多个列名前缀匹配
507 //        MultipleColumnPrefixFilter multiFilter = new MultipleColumnPrefixFilter(new byte[][]{});
508 
509         Scan scan = new Scan();
510         scan.setFilter(filter);
511 
512         return formatToMap(tableNameString,scan);
513     }
514 
515     //根据列名范围以及列名前缀过滤数据
516     public Map<String, List<Cell>> filterByPrefixAndRange(String tableNameString, String colPrefix,
517                                                           String minCol, String maxCol) throws IOException {
518 
519         //列名前缀匹配
520         ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes(colPrefix));
521 
522         //列名范围扫描,上下限范围包括
523         ColumnRangeFilter rangeFilter = new ColumnRangeFilter(Bytes.toBytes(minCol), true,
524                 Bytes.toBytes(maxCol), true);
525 
526         FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
527         filterList.addFilter(filter);
528         filterList.addFilter(rangeFilter);
529 
530         Scan scan = new Scan();
531         scan.setFilter(filterList);
532 
533         return formatToMap(tableNameString,scan);
534     }
535 
536 
537 }
View Code

2、HbaseController.java代码

package com.vue.demo.controller;

import com.vue.demo.service.serviceimpl.HbaseServiceImpl;
import com.vue.demo.utils.Result;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;


/**
 * @author yangwj
 * @date 2020/9/8 14:21
 * @desc
 */
@RestController
public class HbaseController {

    @Autowired
    private HbaseServiceImpl hbaseService;

    @RequestMapping(value = "/createTable")
    @CrossOrigin
    public Result createTable(String table, String... columnFamily) {
       return hbaseService.createTable(table,columnFamily);
    }

    @RequestMapping(value = "/insertData")
    @CrossOrigin
    public Result insertData(String table, String row, String fam, String qual, String val) {
       return hbaseService.putData(table, row, fam, qual, val);
    }

    @RequestMapping(value = "/addFamily")
    @CrossOrigin
    public Result addFamily(String table, String fam) {
       return hbaseService.addFamily(table,fam);
    }

    @RequestMapping(value = "/getDataByRowByKeyAndColumn")
    @CrossOrigin
    public Result getDataByRowByKeyAndColumn(String table, String rowKey, String cf, String clName) {
        return hbaseService.getDataByRowByKeyAndColumn(table, rowKey, cf, clName);
    }

    @RequestMapping(value = "/deleteByKeyAndFamily")
    @CrossOrigin
    public Result deleteByKeyAndFamily(String tableNameStr, String rowKey, String columnFamily) {
        return hbaseService.deleteByKeyAndFamily(tableNameStr, rowKey, columnFamily);
    }
}

3、HbaseService.java代码

package com.vue.demo.service;

import com.vue.demo.utils.Result;

import java.io.IOException;

/**
 * @author yangwj
 * @date 2020/9/8 14:21
 * @desc
 */
public interface HbaseService {

    /**
     * 创建表
     * @param table 表名
     * @param columnFamily 列簇
     * @return
     */
    Result createTable(String table, String... columnFamily) throws IOException;

    /**
     * 添加列簇
     * @param table 表名
     * @param columnFamily 列簇
     */
    Result addFamily(String table, String... columnFamily) throws IOException;

    /**
     * 插入数据
     * @param table 表名
     * @param row rowkey
     * @param fam 列簇
     * @param qual 字段名
     * @param val  字段值
     */
    Result putData(String table, String row, String fam, String qual, String val);

    /**
     * 根据表名和rowkey删除数据
     * @param tableNameString 表名
     * @param rowKey rowkey
     */
    Result deleteByKey(String tableNameString, String rowKey);

    /**
     * 根据 rowkey ,cf ,column 获取数据
     * @param table 表名
     * @param rowKey rowkey
     * @param cf 列簇
     * @param clName 字段名
     */
    Result getDataByRowByKeyAndColumn(String table, String rowKey, String cf, String clName);

    /**
     * 根据rowkey、columnFamily 删除多个数据
     * @param tableNameStr 表名
     * @param rowKey rowkey
     * @param columnFamily 列簇
     * @return
     */
    Result deleteByKeyAndFamily(String tableNameStr, String rowKey, String columnFamily);


}

4、HbaseServiceImpl.java

package com.vue.demo.service.serviceimpl;

import com.vue.demo.service.HbaseService;
import com.vue.demo.utils.HBaseHelper;
import com.vue.demo.utils.ResponseResult;
import com.vue.demo.utils.Result;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @author yangwj
 * @date 2020/9/8 14:21
 * @desc
 */
@Service
public class HbaseServiceImpl implements HbaseService {

    private static final Logger log = LoggerFactory.getLogger(HbaseServiceImpl.class);

    static HBaseHelper helper;
    static Configuration conf ;
    static {
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost:2181");
        try {
            helper = HBaseHelper.getHBaseHelper(conf);
        } catch (IOException e) {
            log.error("hbase init error, the error is {} ",e.getMessage());
        }
    }

    @Override
    public Result createTable(String table, String... columnFamily){
        try {
            helper.createTable(table,columnFamily);
        } catch (IOException e) {
            log.error("hbase create table fail, the table name is {},and error msg is {}",table,e.getMessage());
            return ResponseResult.error(400,e.getMessage());
        }
        return ResponseResult.success();
    }

    @Override
    public Result addFamily(String table, String... columnFamily) {
        try {
            helper.addFamily(table,columnFamily);
        } catch (IOException e) {
            log.error("hbase addFamily fail, the table name is {},and error msg is {}",table,e.getMessage());
            return ResponseResult.error(400,e.getMessage());
        }
        return ResponseResult.success();
    }

    @Override
    public Result putData(String table, String row, String fam, String qual, String val){
        try {
            helper.put(table, row, fam, qual, val);
        } catch (IOException e) {
            log.error("insert data to hbase fail , the error msg is {},and table={},row={}, fam={},qual={},val={}",e.getMessage(),table,row,fam,qual,val);
            return ResponseResult.error(400,e.getMessage());
        }
        return ResponseResult.success();
    }

    @Override
    public Result deleteByKey(String tableNameStr, String rowKey) {
        try {
            helper.deleteByKey(tableNameStr,rowKey);
        } catch (IOException e) {
            log.error("delete data by key fail ,table={},rowkey={}, and the error msg is:{}",tableNameStr,rowKey,e.getMessage());
            return ResponseResult.error(400,e.getMessage());
        }
        return ResponseResult.success();
    }

    @Override
    public Result getDataByRowByKeyAndColumn(String table, String rowKey, String cf, String clName) {
        List<Cell> cells = new ArrayList<>();
        try {
            cells = helper.getRowByKeyAndColumn(table, rowKey, cf, clName);
            this.dumpCells(cells,rowKey);
        } catch (IOException e) {
            log.error("insert data to hbase fail , the error msg is {},and table={},row={}, fam={},qual={}",e.getMessage(),table,rowKey,cf,clName);
            return ResponseResult.error(400,e.getMessage());
        }
        return ResponseResult.success();
    }

    @Override
    public Result deleteByKeyAndFamily(String tableNameStr, String rowKey, String columnFamily) {
        try {
            helper.deleteByKeyAndFamily(tableNameStr, rowKey, columnFamily);
        } catch (IOException e) {
            return ResponseResult.error(400,e.getMessage());
        }
        return ResponseResult.success();
    }

    /**
     * 将数据封装
     * @param list Cell
     * @param rowkey rowkey
     */
    public void dumpCells(List<Cell> list,String rowkey){
        list.stream().forEach(cell -> {
            String cf = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());
            String cn = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
            String value = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
            //todo 带改进
            System.out.printf("[key:%s]	[family:%s] [column:%s] [value:%s]
", rowkey,cf,cn,value);
        });

    }


}

###############以上是Web的基本用法,下面的测试是对上面的补充######################

测试HbaseDemo.java

package com.vue.demo.controller;


import com.vue.demo.utils.HBaseHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.Result;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author yangwj
 * @date 2020/9/7 10:12
 * @desc
 */
public class HbaseDemo {

    static HBaseHelper helper;
    final static String tableNameString ="demoTable" ;

    public static void main(String args[]) throws IOException {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost:2181");
//        conf.set("hbase.rootdir", "file:///opt/hbase_data");
//        conf.set("hbase.zookeeper.property.dataDir", "/opt/hbase_data/zookeeper");

        helper = HBaseHelper.getHBaseHelper(conf);

        //创建测试数据
        //createDemoTable();

        //测试cas
        //CheckAndMutateExample();

        //测试get
        //getData();

        //批量处理数据
        //batchData();

        //创建testtable表数据
        //createTestTable("testtable");

        //分页过滤
        //pageFilterData();
//            跳转过滤
        //skipFilterData();
//         多个过滤器组合
        mutilFilterData();
        //自定义过滤
        //customFilterData();

    }

    //清除并插入测试数据
    private static void createDemoTable() throws IOException {
        if (helper.existsTable(tableNameString))
        { helper.dropTable(tableNameString);}
        helper.createTable(tableNameString, 100, "cf1", "cf2");
        helper.put(tableNameString,
                new String[]{"row1"},
                new String[]{"cf1", "cf2"},
                new String[]{"qual1", "qual2", "qual3"},
                new long[]{1, 2, 3},
                new String[]{"val1", "val2", "val3"});
        helper.put(tableNameString,
                new String[]{"row2"},
                new String[]{"cf1", "cf2"},
                new String[]{"qual1", "qual2", "qual3"},
                new long[]{1, 2, 3},
                new String[]{"val1", "val2", "val3"});
        System.out.println("put data...");
        helper.dump(tableNameString);
    }

    //创建testtable测试数据
    private static void createTestTable(String tableNameString) throws IOException{
        if(tableNameString.isEmpty()) tableNameString = "testtable";
        if(helper.existsTable(tableNameString)){
            helper.dropTable(tableNameString);
        }
        helper.createTable(tableNameString,"info","ex","memo");

        List<Put> puts = new ArrayList<>();
        for(int i=0;i<100;i++){
            String rowKey = "rowKey"+i;
            Put put = new Put(Bytes.toBytes(rowKey));

            String columnFamily = "info";
            String columnName = "username";
            String columnValue = "user"+i;
            put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));

            columnFamily = "ex";
            columnName = "addr";
            columnValue = "street"+i;
            put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));

            columnFamily = "memo";
            columnName = "detail";
            columnValue = "remark"+i;
            put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));

            puts.add(put);
        }
        helper.bulkInsert2(tableNameString,puts);
    }


    //测试操作原子性compare-and-set
    private static void CheckAndMutateExample() throws IOException {
        Table table = helper.getConnection().getTable(TableName.valueOf(tableNameString));
        boolean res = false;
        Put put = null;

        put = new Put(Bytes.toBytes("row1"));
        put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("qual4"), 1, Bytes.toBytes("val1"));
//        如果row1 cf1 qual4 不存在值就插入put数据,存在则不插入
        res = table.checkAndMutate(Bytes.toBytes("row1"), Bytes.toBytes("cf1"))
                .qualifier(Bytes.toBytes("qual4"))
                .ifNotExists()
                .thenPut(put);
        System.out.println("1 result is (expected true) :" + res);

//        put = new Put(Bytes.toBytes("row1"));
//        put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("qual1"), 4, Bytes.toBytes("val1"));
//        //如果row1 cf1 qual1 val1存在就插入put,因为这个value已经存在所以可以插入,结果返回true,时间戳变为4
//        res = table.checkAndMutate(Bytes.toBytes("row1"), Bytes.toBytes("cf1"))
//                .qualifier(Bytes.toBytes("qual1")).ifEquals(Bytes.toBytes("val1"))
//                .thenPut(put);
//        System.out.println("2 result is (expected true) :" + res);

//        put = new Put(Bytes.toBytes("row1"));
//        put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("qual1"),5,Bytes.toBytes("val2"));
//        ////如果row1 cf1 qual1 不等于val2在就插入put
//        res = table.checkAndMutate(Bytes.toBytes("row1"), Bytes.toBytes("cf1"))
//                .qualifier(Bytes.toBytes("qual1"))
//                .ifMatches(CompareOperator.NOT_EQUAL,Bytes.toBytes("val2"))
//                .thenPut(put);
//        System.out.println("3 result is (expected true) :" + res);

        put = new Put(Bytes.toBytes("row1"));
        put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("qual5"),1,Bytes.toBytes("val1"));
        put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("qual6"),1,Bytes.toBytes("val1"));

        Delete delete = new Delete(Bytes.toBytes("row1"));
        delete.addColumns(Bytes.toBytes("cf1"), Bytes.toBytes("qual4"));

        //RowMutations这个版本还没定型
        RowMutations mutations = new RowMutations(Bytes.toBytes("row1"));
        mutations.add(put);
        mutations.add(delete);

        //row1 cf1 qual4 val1存在,row1 cf1 qual5和row1 cf1 qual6无值则插入qual5和qual6的值,并删除qual4的值
        res = table.checkAndMutate(Bytes.toBytes("row1"),Bytes.toBytes("cf1")).qualifier(Bytes.toBytes("qual4"))
                .ifEquals(Bytes.toBytes("val1"))
                .qualifier(Bytes.toBytes("qual5")).ifNotExists()
                .qualifier(Bytes.toBytes("qual6")).ifNotExists()
                .thenMutate(mutations);
        System.out.println("1 result is (expected true) :" + res);


    }

    //get测试
    private static void getData() throws IOException{
        String key = "row1";
        String cf = "cf1";
        String cl = "qual1";

        System.out.println("first get:");
        helper.dumpCells(key,helper.getRowByKeyAndColumn(tableNameString,key,cf,cl));



        System.out.println("second get:");
        String tableName = "testtable";
        Map<String,List<Cell>> map = helper.getRowByKeys(tableName,"rowKey7","rowKey8","rowKey9");
        for(Map.Entry<String,List<Cell>> entry:map.entrySet()){
            helper.dumpCells(entry.getKey(),entry.getValue());
        }
    }

    //批处理数据,测试数据demoTable
    //注意:同一个rowKey不能同时使用put和delete
    private static void batchData() throws IOException{
        Table table = helper.getConnection().getTable(TableName.valueOf(tableNameString));

        byte[] row1 = Bytes.toBytes("row1");
        byte[] row2 = Bytes.toBytes("row2");
        byte[] cf1 = Bytes.toBytes("cf1");
        byte[] cf2 = Bytes.toBytes("cf2");
        byte[] qualifier1 = Bytes.toBytes("qual1");
        byte[] qualifier2 = Bytes.toBytes("qual2");

        List<Row> list = new ArrayList<>();


        //增/改
        Put put = new Put(row1);
        put.addColumn(cf1,qualifier1,5,Bytes.toBytes("row1_batch1"));
        put.addColumn(cf2,qualifier2,5,Bytes.toBytes("row1_batch2"));
        list.add(put);
        //
        Get get = new Get(row1);
        get.addColumn(cf1,qualifier1);
        get.addColumn(cf2,qualifier2);
        list.add(get);
        //
        Delete delete = new Delete(row2);
        delete.addColumns(cf1,qualifier2);
        list.add(delete);
        //删除不存在的
        get = new Get(row2);
        get.addFamily(Bytes.toBytes("noexists")); //列族不存在,这里将抛出异常
        list.add(get);

        Object[] results = new Object[list.size()];

        try {
            table.batch(list,results);
        }catch (Exception e){
            e.printStackTrace();
        }

        for(int i=0;i<results.length;i++){
            System.out.println("result["+i+"]: type = "+results[i].getClass().getSimpleName()+results[i]);
        }

        table.close();
        helper.dump(tableNameString);
        helper.close();
    }

    //分页过滤
    private static void pageFilterData() throws IOException{
        Table table = helper.getConnection().getTable(TableName.valueOf("testtable"));
        final byte[] POSTFIX = new byte[] { 0x00 };
        Filter filter = new PageFilter(10);

        int totalRows = 0;
        byte[] lastRow = null;
        while(true){
            Scan scan = new Scan();
            scan.setFilter(filter);
            if(lastRow!=null){

                //为了兼容以前的scan.setStartRow()代码
                //在上一次的最后一行加上一个空的byte数据,在下一个分页上,就会以新的key开始,
                // 但是实际上这个key并不存在,所以还是从真正的下一行开始扫描
                //这么做的原因是scan的扫描会自动包含起始行,如果不加空字节数据,那么定位上就会把上一次的最后一行作为起始行,最后的数据就会多一行。
                //而,新的api是withStartRow(byte[] startRow, boolean inclusive),可以直接设置是否包含起始行,完美解决问题,但是又保留了对
                //以前api函数的兼容性
//                byte[] startRow = Bytes.add(lastRow,POSTFIX);
//                System.out.println("start row: " + Bytes.toStringBinary(startRow));
//                scan.withStartRow(startRow,true);


                System.out.println("start row: " + Bytes.toStringBinary(lastRow));
                //不包含起始行,所以可以直接使用上一次的最后一行作为起始行
                scan.withStartRow(lastRow,false);
            }
            ResultScanner scanner = table.getScanner(scan);
            int localRows = 0;
            Result result;
            while ((result=scanner.next())!=null){
                System.out.println(localRows++ + ": " + result);
                totalRows++;
                lastRow = result.getRow();
            }
            scanner.close();
            if(localRows==0){break;}
        }
        table.close();
        System.out.println("total rows: " + totalRows);
    }


    //跳转过滤
    private static void skipFilterData() throws IOException{
        Table table = helper.getConnection().getTable(TableName.valueOf("demoTable"));
        Filter filter = new ValueFilter(CompareOperator.EQUAL,new BinaryComparator(Bytes.toBytes("val2")));

        Scan scan = new Scan();
        scan.setFilter(filter);

        ResultScanner scanner1 = table.getScanner(scan);
        System.out.println("Results of scan #1:");
        int n = 0;
        for (Result result : scanner1) {
            for (Cell cell : result.rawCells()) {
                System.out.println("Cell: " + cell + ", Value: " +
                        Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
                                cell.getValueLength()));
                n++;
            }
        }
        scanner1.close();

        //应用跳转过滤
        Filter skipFilter  = new SkipFilter(filter);
        Scan scan2 = new Scan();
        scan2.setFilter(skipFilter);
        ResultScanner scanner2 = table.getScanner(scan2);
        System.out.println("Total cell count for scan #1: " + n);
        n = 0;
        System.out.println("Results of scan #2:");
        for (Result result : scanner2) {
            for (Cell cell : result.rawCells()) {
                System.out.println("Cell: " + cell + ", Value: " +
                        Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
                                cell.getValueLength()));
                n++;
            }
        }
        scanner2.close();
        table.close();
        System.out.println("Total cell count for scan #2: " + n);

    }

    //多个过滤器
    //使用FilterList要保证过滤器的顺序需要使用List<Filter>
    private static void mutilFilterData() throws IOException{
        Table table = helper.getConnection().getTable(TableName.valueOf("testtable"));

        List<Filter> filters = new ArrayList<Filter>();

        Filter filter1 = new RowFilter(CompareOperator.GREATER_OR_EQUAL,
                new BinaryComparator(Bytes.toBytes("rowKey60")));
        filters.add(filter1);

        Filter filter2 = new RowFilter(CompareOperator.LESS_OR_EQUAL,
                new BinaryComparator(Bytes.toBytes("rowKey69")));
        filters.add(filter2);

        Filter filter3 = new QualifierFilter(CompareOperator.EQUAL,
                new RegexStringComparator("username"));
        filters.add(filter3);

        FilterList filterList1 = new FilterList(FilterList.Operator.MUST_PASS_ALL,filters);

        Scan scan = new Scan();
        scan.setFilter(filterList1);
        ResultScanner scanner1 = table.getScanner(scan);
        System.out.println("Results of scan #1 - MUST_PASS_ALL:");
        int n = 0;
        for (Result result : scanner1) {
            for (Cell cell : result.rawCells()) {
                System.out.println("Cell: " + cell + ", Value: " +
                        Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
                                cell.getValueLength()));
                n++;
            }
        }
        scanner1.close();
        table.close();

    }

    //使用自定义过滤器,只显示匹配列值的行
    private static void customFilterData() throws IOException{
        Table table = helper.getConnection().getTable(TableName.valueOf("testtable"));

        List<Filter> filters = new ArrayList<Filter>();

//        Filter filter1 = new CustomFilter(Bytes.toBytes("user30"));
//        filters.add(filter1);
//
//        Filter filter2 = new CustomFilter(Bytes.toBytes("user20"));
//        filters.add(filter2);
//
//        Filter filter3 = new CustomFilter(Bytes.toBytes("user90"));
//        filters.add(filter3);

        FilterList filterList = new FilterList(
                FilterList.Operator.MUST_PASS_ONE, filters);

        Scan scan = new Scan();
        scan.setFilter(filterList);
        ResultScanner scanner = table.getScanner(scan);
        for(Result result:scanner){
            helper.dumpResult(result);
        }

        scanner.close();
        table.close();
    }
}
View Code

测试MyHBase.java

package com.vue.demo.controller;


import com.vue.demo.utils.HBaseHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author yangwj
 * @date 2020/9/7 15:06
 * @desc
 */
public class MyHBase {
    Configuration conf;

    HBaseHelper helper;

    //初始化
    private void setUp() throws IOException{
        conf = HBaseConfiguration.create();
        //conf.set("hbase.master","192.168.31.10");
        //The port the HBase Master should bind to.
//        conf.set("hbase.master.port","16000");

        //The port for the HBase Master web UI. Set to -1 if you do not want a UI instance run.
//        conf.set("hbase.master.info.port","16010");

        //The port the HBase RegionServer binds to.
//        conf.set("hbase.regionserver.port","16020");

        //The port for the HBase RegionServer web UI Set to -1 if you do not want the RegionServer UI to run.
//        conf.set("hbase.regionserver.info.port","16030");

        conf.set("hbase.zookeeper.quorum", "localhost:2181");

        //Property from ZooKeeper’s config zoo.cfg. The port at which the clients will connect.
        // HBase数据库使用的端口
        //conf.set("hbase.zookeeper.property.clientPort", "2181");

        //单机
        //conf.set("hbase.rootdir","file:///opt/hbase_data");
        //conf.set("hbase.zookeeper.property.dataDir","/opt/hbase_data/zookeeper");

        helper = HBaseHelper.getHBaseHelper(conf);
    }


    //user表插入测试数据
    private void insertUserData() throws IOException{
        // 取得数据表对象
        Table table = helper.getConnection().getTable(TableName.valueOf("user"));

        // 需要插入数据库的数据集合

        List<Put> putList = new ArrayList<Put>();

        Put put;

        // 生成数据集合
        for(int i = 0; i < 10; i++){
            put = new Put(Bytes.toBytes("user" + i));
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("username"), Bytes.toBytes("中文战士" + i));
            put.addColumn(Bytes.toBytes("ex"), Bytes.toBytes("addr"), Bytes.toBytes("北京路1088号:" + i));

            putList.add(put);
        }

        // 将数据集合插入到数据库
        table.put(putList);
        table.close();


    }

    //查询导出所有数据
    private void queryAll(String tableNameString) throws IOException{
        System.out.println("导出数据:"+tableNameString);
        // 取得数据表对象
        Table table = helper.getConnection().getTable(TableName.valueOf(tableNameString));

        // 取得表中所有数据
        ResultScanner scanner = table.getScanner(new Scan());

        for(Result result:scanner){
            byte[] row = result.getRow();
            System.out.println("row key is:" + new String(row));

            List<Cell> cellList = result.listCells();
            for(Cell cell:cellList){
                byte[] familyArray = cell.getFamilyArray();
                byte[] qualifierArray = cell.getQualifierArray();
                byte[] valueArray = cell.getValueArray();

                String s1 = "row familyArray is:" + Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());
                String s2 = "row qualifierArray is:" + Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
                String s3 = "row value is:" + Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());

                System.out.println(s1+",  "+s2+",  "+s3+"	"+cell.getTimestamp());

                System.out.println();

                //直接使用出现乱码,要使用Bytes类,并且需要使用offset和length
//                System.out.println("row value is:" + new String(familyArray, StandardCharsets.UTF_8) + new String(qualifierArray,StandardCharsets.UTF_8)
//                        + new String(valueArray,StandardCharsets.UTF_8));
            }
        }
        scanner.close();
        table.close();

        System.out.println("导出结束...");
        System.out.println();
    }

    private void close() throws IOException{

        helper.close();
    }

    //插入testtable表数据
    private void initTestTable() throws IOException{
        String tableNameString = "user";
        if(helper.existsTable(tableNameString)){
            helper.disableTable(tableNameString);
            helper.dropTable(tableNameString);
        }

        helper.createTable(tableNameString,"info","ex","memo");
        helper.put(tableNameString,"row1","info","username","admin");
        helper.put(tableNameString,"row1","ex","addr","北京大道");
        helper.put(tableNameString,"row1","memo","detail","超级用户,地址:北京大道");


        helper.put(tableNameString,"row2","info","username","guest");
        helper.put(tableNameString,"row2","ex","addr","全国各地");
        helper.put(tableNameString,"row2","memo","detail","游客,地址:全国到处都是");

        helper.close();
    }

    private void bulkInsertTestTable() throws IOException{
        String tableNameString = "testtable";
        if(!helper.existsTable(tableNameString)){
            helper.createTable(tableNameString,"info","ex","memo");
        }

        System.out.println(".........批量插入数据start.........");
        List<Map<String,Object>> mapList = new ArrayList<>();
        for(int i=1;i<201;i++){
            Map<String,Object> map = new HashMap<>();
            map.put("rowKey","testKey"+i);
            map.put("columnFamily","info");
            map.put("columnName","username");
            map.put("columnValue","guest"+i);

            map.put("rowKey","testKey"+i);
            map.put("columnFamily","ex");
            map.put("columnName","addr");
            map.put("columnValue","北京路"+i+"号");

            map.put("rowKey","testKey"+i);
            map.put("columnFamily","memo");
            map.put("columnName","detail");
            map.put("columnValue","联合国地球村北京路第"+i+"号");

            mapList.add(map);
        }

        helper.bulkInsert(tableNameString,mapList);

        System.out.println(".........批量插入数据end.........");
    }

    private void insertByRowKey(String table,String rowKey) throws IOException{
        Put put = new Put(Bytes.toBytes(rowKey));

        String columnFamily ;
        String columnName ;
        String columnValue ;
        for(int i=0;i<10;i++){
            columnFamily = "info";
            columnName = "username"+i;
            columnValue = "user111";
            put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));

            columnFamily = "ex";
            columnName = "addr"+i;
            columnValue = "street 111";
            put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));

            columnFamily = "memo";
            columnName = "detail"+i;
            columnValue = "sssss zzz 111222 ";
            put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));
        }
        System.out.println("----> put size:"+put.size());

        helper.put(table,put);

    }

    private void bulkInsertTestTable2(String tableNameString) throws IOException{
//        String tableNameString = "testtable";
        if(!helper.existsTable(tableNameString)){
            helper.createTable(tableNameString,"info","ex","memo");
        }

        List<Put> puts = new ArrayList<>();
        for(int i=0;i<10;i++){
            String rowKey = "rowKey"+i;
            Put put = new Put(Bytes.toBytes(rowKey));

            String columnFamily = "info";
            String columnName = "username2";
            String columnValue = "user"+i;
            put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));

            columnFamily = "ex";
            columnName = "addr2";
            columnValue = "street "+i;
            put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));

            columnFamily = "memo";
            columnName = "detail2";
            columnValue = "aazzdd "+i;
            put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue));

            System.out.println("put size:"+put.size());
            puts.add(put);
        }
        helper.bulkInsert2(tableNameString,puts);
    }

    private void dumpTable(String tableNameString) throws IOException{
        helper.dump(tableNameString);
        helper.close();
    }

    private void deleteByKey() throws IOException{
        String tableNameString = "testtable";
        String rowKey = "rowKey0";
        helper.deleteByKey(tableNameString,rowKey);
    }

    private void deleteByKeyAndFamily() throws IOException{
        String tableNameString = "testtable";
        String rowKey = "rowKey1";
        String columnFamily="ex";
        helper.deleteByKeyAndFamily(tableNameString,rowKey,columnFamily);
    }

    private void deleteByKeyAndFC() throws IOException{
        String tableNameString = "testtable";
        String rowKey = "rowKey3";
        String columnFamily="ex";
        List<String> list = new ArrayList<>();
//        list.add("addr");
        list.add("addr2");
        helper.deleteByKeyAndFC(tableNameString,rowKey,columnFamily,list);
    }

    private void createTableBySplitKey() throws IOException{
        String tableNameString = "testtable2";
        byte[][] splitKeys = {
                Bytes.toBytes("10"),
                Bytes.toBytes("60"),
                Bytes.toBytes("120"),
        };
        helper.createTable(tableNameString,splitKeys,"info","ex","memo");
    }

    private void getDataByRowKey(String table,String rowKey) throws IOException{
        List<Cell> cells = helper.getRowByKey(table,rowKey);
        dumpCells(rowKey,cells);
    }

    private void getDataByRowKeyFilter(String table,String rowKey) throws IOException{
        Map<String,List<Cell>> map = helper.filterByRowKeyRegex(table,rowKey, CompareOperator.EQUAL);
        for(Map.Entry<String,List<Cell>> entry: map.entrySet()){
            String key = entry.getKey();
            List<Cell> list = entry.getValue();
            dumpCells(key,list);
        }
    }

    private void getDataByValueFilter(String table,String family,String colName,String colValue) throws IOException{
        Map<String,List<Cell>> map = helper.filterByValueRegex(table,family,colName,colValue,CompareOperator.EQUAL);
        for(Map.Entry<String,List<Cell>> entry: map.entrySet()){
            String key = entry.getKey();
            List<Cell> list = entry.getValue();
            dumpCells(key,list);
        }
    }

    private void getDataByColumnPrefix(String table,String prefix) throws IOException{
        Map<String,List<Cell>> map = helper.filterByColumnPrefix(table,prefix);
        for(Map.Entry<String,List<Cell>> entry:map.entrySet()){
            String key = entry.getKey();
            List<Cell> list = entry.getValue();
            dumpCells(key,list);
        }
    }

    private void dumpCells(String key,List<Cell> list){
        for(Cell cell:list){
            String columnFamily = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());
            String columnName = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
            String value = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
            System.out.printf("[key:%s]	[family:%s] [column:%s] [value:%s]
",
                    key,columnFamily,columnName,value);
        }
    }


    private void getDataByComplexCol(String table,String colPrefix,String minCol,String maxCol) throws IOException{
        Map<String,List<Cell>> map = helper.filterByPrefixAndRange(table,colPrefix,minCol,maxCol);
        for(Map.Entry<String,List<Cell>> entry:map.entrySet()){
            String key = entry.getKey();
            List<Cell> list = entry.getValue();
            dumpCells(key,list);
        }
    }

    public static void main(String[] args) throws IOException{
//        System.out.println("字符编码:"+System.getProperty("file.encoding"));
        MyHBase myHBase = new MyHBase();
        myHBase.setUp();

        //myHBase.initTestTable();
       // myHBase.insertUserData();
//        myHBase.createTableBySplitKey();
//        myHBase.deleteByKey();
//        myHBase.deleteByKeyAndFamily();
//        myHBase.deleteByKeyAndFC();
//        myHBase.bulkInsertTestTable2("testtable2");
       // myHBase.queryAll("user");
//        myHBase.queryAll("testtable");
//        myHBase.dumpTable("testtable2");
  //      myHBase.insertByRowKey("testtable2","rowKey0");
//        myHBase.getDataByRowKey("testtable2","rowKey0");
       // myHBase.getDataByRowKeyFilter("testtable2","Key1$");
       // myHBase.getDataByRowKeyFilter("user","^power");
        myHBase.getDataByValueFilter("user","info","username","战士1");

//        myHBase.getDataByColumnPrefix("user","ad");
        //myHBase.getDataByComplexCol("testtable2","username0","username0","username9");
        myHBase.close();


    }
}
View Code

 具体的可以参考:https://www.cnblogs.com/asker009/p/10626508.html

原文地址:https://www.cnblogs.com/ywjfx/p/13627788.html