python操作hbase

# -*- coding: utf-8 -*-

import happybase
from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase

host = 'ip'
port = 9090
rowkey = ""

'''
happybase为python操作habse的常用第三方模块,但不支持habse的删除操作。
HbaseUtil: 封装hbase模块,实现 表的查询、删除
HappyHbaseUtil:封装happybase模块,实现创建表、插入、删除rawkey、查找

'''


def ExceptionHabase(function):
    try:
        def warpper(*args, **kwargs):
            function(*args, **kwargs)
    except Exception as exp:
        return exp
    else:
        return warpper


class HbaseUtil(object):

    def __init__(self):
        self.transport = TBufferedTransport(TSocket(host, port))
        self.transport.open()
        self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
        self.client = Hbase.Client(self.protocol)

    def __del__(self):
        self.transport.close()

    def show_tables(self):
        '''
        查看表中的所有的表
        :return:
        '''
        return self.client.getTableNames()

    @ExceptionHabase
    def delete_table(self, tablename):
        '''
        删除hbase中的表,如果表在占用则停止掉进行删除,否则直接删除
        :param tablename: 表名
        :return:
        '''
        if self.client.isTableEnabled(tablename):
            self.client.disableTable(tablename)
        self.client.deleteTable(tablename)
        return "sucess delete {tablename}".format(tablename=tablename)


class HappyHbaseUtil(object):

    def __init__(self):
        self.connection = happybase.Connection(host)
        self.connection.open()

    def __del__(self):
        self.connection.close()

    @ExceptionHabase
    def create_table(self, tablename=None, families=None):
        '''
        :param tablename:
        :param families:
        :return:
        '''
        if families == None:
            families = dict()
        self.connection.create_table(name=tablename, families=families)
        return "suceess create table {tablename}".format(tablename=tablename)

    @ExceptionHabase
    def insert(self, tablename, data=None, rawkey=None, batch_size=10):
        '''
        :param tablename:  表名
        :param data:   插入数据 数据类型为dict
        :param rawkey:  指定rawkey
        :return:
        '''
        if not isinstance(data, dict):
            import json
            data = json.dumps(data)
        table = self.connection.table(tablename)
        with table.batch(batch_size=batch_size) as bat:
            bat.put(rawkey, data=data)
        return "success"

    @ExceptionHabase
    def delete_rawkey(self, tablename, rawkey,families=None):
        '''
        :param tablename:表名
        :param rawkey:删除的rawkey
        :return:
        '''
        table = self.connection.table(tablename)
        with table.batch() as bat:
            bat.delete(rawkey,columns=families)
        return "success delete {rawkey}".format(rawkey=rawkey)

    @ExceptionHabase
    def select_rawkey(self, tablename, rawkey,families=None):
        '''
        :param tablename:表名
        :param rawkey: 查找的rawkey
        :param families:查找的列族 类型为List  ['cf1:price', 'cf1:rating'])
        :return:
        '''
        table = self.connection.table(tablename)
        return table.row(rawkey,columns=families)


    @ExceptionHabase
    def select_rawkey_list(self,tablename,rawkeylist,families=None):
        '''
        :param tablename: 查询的表名
        :param rawkeylist: 查询的rawkey列表,以list的形式传入
        :param families:列族 类型为list  ['cf1:price', 'cf1:rating']
        :return:
        '''
        table = self.connection.table(tablename)
        return dict(table.rows(rawkeylist,columns=families))



if __name__ == "__main__":
    ##使用连接池##
    hb = HappyHbaseUtil()
    pool = happybase.ConnectionPool(size=10, host=host)
    with pool.connection() as connection:
        setattr(hb,'connection',connection)
    hb.select_rawkey(tablename='habase',rawkey='123')
原文地址:https://www.cnblogs.com/tigerzhouv587/p/11284623.html