Kudu基本操作及概念

Kudu:
    针对 Apache Hadoop 平台而开发的列式存储管理器。

使用场景:
    适用于那些既有随机访问,也有批量数据扫描的复合场景。
    高计算量的场景。
    使用了高性能的存储设备,包括使用更多的内存。
    支持数据更新,避免数据反复迁移。
    支持跨地域的实时数据备份和查询。
    
kudu的关键机制:
1.模仿数据库,以二维表的形式组织数据,创建表的时候需要指定schema。所以只支持结构化数据。

2.每个表指定一个或多个主键。

3.支持insert/update/delete,这些修改操作全部要指定主键。

4.read操作,只支持scan原语。

5.一致性模型,默认支持snapshot ,这个可以保证scan和单个客户端 read-you-writes一致性保证。更强的一致性保证,提供manually propagate timestamps between clients或者commit-wait。

6.cluster类似hbase简单的M-S结构,master支持备份。

7.单个表支持水平分割,partitions叫tablets,单行一定在一个tablets里面,支持范围,以及list等更灵活的分区键。

8.使用Raft 协议,可以根据SLA指定备份块数量。

9.列式存储

10.delta flushes,数据先更新到内存中,最后在合并到最终存储中,有专门到后台进程负责。

11.Lazy Materialization ,对一些选择性谓词,可以帮助跳过很多不必要的数据。

12.支持和MR/SPARK/IMPALA等集成,支持Locality ,Columnar Projection ,Predicate pushdown 等。


注意:
1、建表的时候要求所有的tserver节点都活着
2、根据raft机制,允许(replication的副本数-)/ 2宕掉,集群还会正常运行,否则会报错找不到ip:7050(7050是rpc的通信端口号),需要注意一个问题,第一次运行的时候要保证集群处于正常状态下,也就是所有的服务都启动,如果运行过程中,允许(replication的副本数-)/ 2宕掉
3、读操作,只要有一台活着的情况下,就可以运行



maven 依赖:

        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client-tools</artifactId>
            <version>1.7.0</version>
        </dependency>





Java 代码:

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.apache.kudu.spark.kudu.KuduContext;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @Author:Xavier
 * @Data:2019-02-22 09:25
 **/


public class KuduOption {
    // master地址
    private static final String KUDU_MASTER = "nn02:7051";

    private static String tableName = "KuduTest";

    //创建表
    @Test
    public void CreateTab() {
        // 创建kudu的数据库链接
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();

        try {
            // 设置表的schema(模式)
            List<ColumnSchema> columns = new ArrayList(2);
            columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build());
            Schema schema = new Schema(columns);

            //创建表时提供的所有选项
            CreateTableOptions options = new CreateTableOptions();

            // 设置表的replica备份和分区规则
            List<String> rangeKeys = new ArrayList<>();
            rangeKeys.add("key");

            // 一个replica
            options.setNumReplicas(1);
            // 用列rangeKeys做为分区的参照
            options.setRangePartitionColumns(rangeKeys);
            client.createTable(tableName, schema, options);

            // 添加key的hash分区
            //options.addHashPartitions(parcols, 3);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    //向表内插入新数据
    @Test
    public void InsertData() {
        // 创建kudu的数据库链接
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        try {
            // 打开表
            KuduTable table = client.openTable(tableName);
            // 创建写session,kudu必须通过session写入
            KuduSession session = client.newSession();

            // 采取Flush方式 手动刷新
            session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
            session.setMutationBufferSpace(3000);

            System.out.println("-------start--------" + System.currentTimeMillis());

            for (int i = 1; i < 6100; i++) {
                Insert insert = table.newInsert();
                // 设置字段内容
                PartialRow row = insert.getRow();
                row.addString("key", i+"");
                row.addString(1, "value"+i);
                session.flush();
                session.apply(insert);
            }
            System.out.println("-------end--------" + System.currentTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    //更新数据
    @Test
    public void kuduUpdateTest() {
        // 创建kudu的数据库链接
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        try {
            KuduTable table = client.openTable(tableName);
            KuduSession session = client.newSession();

            Update update = table.newUpdate();
            PartialRow row = update.getRow();

            //
            row.addString("key", 998 + "");
            row.addString("value", "updata Data " + 10);
            session.flush();
            session.apply(update);

//            System.out.print(operationResponse.getRowError());

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    //根据主键删除数据
    @Test
    public void deleteData(){
        KuduClient client=new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        try {
            KuduTable table=client.openTable(tableName);
            KuduSession session=client.newSession();

            Delete delete=table.newDelete();
            PartialRow row=delete.getRow();
            row.addString("key","992");

            session.apply(delete);
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }

    //扫描数据
    @Test
    public void SearchData() {
        // 创建kudu的数据库链接
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();

        try {
            KuduTable table = client.openTable(tableName);

            List<String> projectColumns = new ArrayList<>(1);
            projectColumns.add("value");
            KuduScanner scanner = client.newScannerBuilder(table)
                    .setProjectedColumnNames(projectColumns)
                    .build();
            while (scanner.hasMoreRows()) {
                RowResultIterator results = scanner.nextRows();
                while (results.hasNext()) {
                    RowResult result = results.next();
                    System.out.println(result.getString(0));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    //条件扫描数据
    @Test
    public void searchDataByCondition(){
        KuduClient client =new KuduClient.KuduClientBuilder(KUDU_MASTER).build();

        try {
            KuduTable table=client.openTable(tableName);

            KuduScanner.KuduScannerBuilder scannerBuilder=client.newScannerBuilder(table);

            //设置搜索的条件
            KuduPredicate predicate=KuduPredicate.
                    newComparisonPredicate(
                            table.getSchema().getColumn("key"),//设置要值的谓词(字段)
                            KuduPredicate.ComparisonOp.EQUAL,//设置搜索逻辑
                            "991");//设置搜索条件值
            scannerBuilder.addPredicate(predicate);

            // 开始扫描
            KuduScanner scanner=scannerBuilder.build();
            while(scanner.hasMoreRows()){
                RowResultIterator iterator=scanner.nextRows();
                while(iterator.hasNext()){
                    RowResult result=iterator.next();
                    System.out.println("输出: "+result.getString(0)+"--"+result.getString("value"));
                }
            }
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }

    //删除表
    @Test
    public void DelTab() {
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        try {
            client.deleteTable(tableName);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    //
    @Test
    public void searchBysparkSql() {
        SparkSession sparkSession = getSparkSession();
        List<StructField> fields = Arrays.asList(
                DataTypes.createStructField("key", DataTypes.StringType, true),
                DataTypes.createStructField("value", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(fields);
        Dataset ds = sparkSession.read().format("org.apache.kudu.spark.kudu").
                schema(schema).option("kudu.master", "nn02:7051").option("kudu.table", "KuduTest").load();
        ds.registerTempTable("abc");
        sparkSession.sql("select * from abc").show();
    }

    @Test
    public void checkTableExistByKuduContext() {
        SparkSession sparkSession = getSparkSession();
        KuduContext context = new KuduContext("172.19.224.213:7051", sparkSession.sparkContext());
        System.out.println(tableName + " is exist = " + context.tableExists(tableName));
    }

    public SparkSession getSparkSession() {
        SparkConf conf = new SparkConf().setAppName("test")
                .setMaster("local[*]")
                .set("spark.driver.userClassPathFirst", "true");

        conf.set("spark.sql.crossJoin.enabled", "true");
        SparkContext sparkContext = new SparkContext(conf);
        SparkSession sparkSession = SparkSession.builder().sparkContext(sparkContext).getOrCreate();
        return sparkSession;
    }
}





原文地址:https://www.cnblogs.com/xavier-xd/p/10417805.html