Hbase客户端示例

public interface HBaseOperations {

    <T> T get(String tableName, String rowKey, String familyName,
              ResultMapper<T, Result> mapper);

    <T> T get(String tableName, String rowKey, String familyName, String qualifier,
              ResultMapper<T, Result> mapper);

    <T> T get(String tableName, List<String> rowKeys, String familyName,
              ResultMapper<T, Result[]> mapper);

    <T> T get(String tableName, List<String> rowKeys, String familyName, String qualifier,
              ResultMapper<T, Result[]> mapper);

    void put(String tableName, String rowKey, String familyName, String qualifier, byte[] value);

    void put(String tableName, List<Put> putList);

}

一 初始化

public class HBaseTemplate implements HBaseOperations {

    private Connection connection;

    @PostConstruct
    private void connect() {
        log.info("start connecting to HBase");
        try {
            Configuration configuration = HBaseConfiguration.create();
            
                configuration.set("hbase.zookeeper.quorum", HBaseConfig.getHBaseZkHost());
                configuration.set("hbase.zookeeper.property.clientPort", HBaseConfig.getHBaseZkPort());
                configuration.set("zookeeper.znode.parent", HBaseConfig.getHBaseZkZnodeParent());// /ymm_hbase
// HBase操作失败重试次数 
configuration.setInt("hbase.client.retries.number", 2);
// HBase重试间隔ms
configuration.setInt("hbase.client.pause", 50);

// HBase rpc请求超时时间ms configuration.setInt("hbase.rpc.timeout", 1000);
configuration.setInt(
"hbase.client.operation.timeout", 5000);
connection
= ConnectionFactory.createConnection(configuration);
}
catch (Exception e) {
log.error(
"failed to connect HBase", e); } }

二 get部分

@Override
    public <T> T get(String tableName, String rowKey, String familyName,
                     ResultMapper<T, Result> mapper) {
        return get(tableName, rowKey, familyName, null, mapper);
    }

    @Override
    public <T> T get(String tableName, String rowKey, String familyName, String qualifier,
              ResultMapper<T, Result> mapper) {
        return execute(tableName, (table -> {
            Get get = buildGet(rowKey, familyName, qualifier);
            Result result = table.get(get);
            return mapper.mapResult(result);
        }));
    }

    @Override
    public <T> T get(String tableName, List<String> rowKeys, String familyName,
              ResultMapper<T, Result[]> mapper) {
        return get(tableName, rowKeys, familyName, null, mapper);
    }

    @Override
    public <T> T get(String tableName, List<String> rowKeys, String familyName, String qualifier,
              ResultMapper<T, Result[]> mapper) {
        return execute(tableName, (table -> {
            List<Get> gets = new ArrayList<>(rowKeys.size());
            rowKeys.forEach(rowKey -> {
                Get get = buildGet(rowKey, familyName, qualifier);
                gets.add(get);
            });
            Result[] results = table.get(gets);
            return mapper.mapResult(results);
        }));
    }

execute

private <T> T execute(String tableName, TableCallback<T> action) {
        if(action == null || StringUtils.isBlank(tableName)) {
            throw new HBaseException("execute param error");
        }
        Table table = getTable(tableName);
        try {
            T result = action.doInTable(table);
            return result;
        } catch (Throwable t) {
            throw new HBaseException("invoke HBase failed", t);
        } finally {
            releaseTable(table);
        }
    }
public static void releaseTable(Table table) {
        try {
            table.close();
        } catch (IOException e) {
            throw new HBaseException("release table failed", e);
        }
    }
private Table getTable(String tableName) {
        Table table;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
        } catch (IOException e) {
            throw new HBaseException("get table failed", e);
        }
        return table;
    }

我们这里拿一个实际的例子 看看 ResultMapper的实现类

 private List<WaybillLogEntity> mapResult(Result result) {
        List<WaybillLogEntity> waybillLogEntityList = Lists.newArrayList();
        if(result == null ||result.isEmpty()) {
            return waybillLogEntityList;
        }
        Cell[] cells = result.rawCells();
        Arrays.stream(cells).forEach(cell -> {
            WaybillLogEntity waybillLogEntity = convert(cell);
            if(waybillLogEntity != null) {
                waybillLogEntityList.add(waybillLogEntity);
            }
        });
        return waybillLogEntityList;
    }
private WaybillLogEntity convert(Cell cell) {
        if(cell == null) {
            return null;
        }
        String value = new String(CellUtil.cloneValue(cell));
        return JSON.parseObject(value, WaybillLogEntity.class);
    }

看得出来 公司都是把对象转成json传

三 put部分

  

@Override
    public void put(String tableName, String rowKey, String familyName, String qualifier, byte[] value) {
        if(StringUtils.isBlank(tableName)
                || StringUtils.isBlank(rowKey)
                || StringUtils.isBlank(familyName)
                || StringUtils.isBlank(qualifier)
                || value == null) {
            log.info("put hbase param error");
            return;
        }
        execute(tableName, (table -> {
            Put put = new Put(Bytes.toBytes(rowKey))
                            .addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier), value);
            table.put(put);
            return null;
        }));
    }

    @Override
    public void put(String tableName, List<Put> putList) {
        if(StringUtils.isBlank(tableName) || CollectionUtils.isEmpty(putList)) {
            log.info("put list hbase param error");
            return;
        }
        execute(tableName, (table -> {
            table.put(putList);
            return null;
        }));
    }
hBaseTemplate.put(buildTableName(),
                    buildRowKey(waybillLogEntity.getWaybillId()),
                    buildFamilyName(),
                    buildQualifier(waybillLogEntity),
                    JSONObject.toJSONBytes(waybillLogEntity));
private String buildTableName() {
        return HBASE_NAMESPACE + ":" + HBaseConfig.getHBaseTableTpWaybillLog();
    }
HBASE_NAMESPACE String HBASE_NAMESPACE = "default";
private String buildRowKey(long waybillId) {
        long prefix = waybillId % 64;
        return prefix + "|" + waybillId;
    }
private static final String DEFAULT_FAMILY_NAME = "data";
private String buildQualifier(WaybillLogEntity waybillLogEntity) {
        if(waybillLogEntity == null) {
            return null;
        }
        return String.valueOf(waybillLogEntity.getId());
    }
原文地址:https://www.cnblogs.com/juniorMa/p/14838170.html