OLAP之Druid之批量数据摄入

批量数据摄入

准备工作

  • 确保druid帐号可以访问到位于HDFS之上的原始数据
  • 考虑清楚数据中哪一列可以作为时间列、哪些列可以作为维度列、哪些列可以作为指标列(尤其是指标的聚合函数,包括countsummaxmin等,如果涉及UV、留存的计算,则需要使用HyperUnique或者Theta sketch
  • 考虑最小时间粒度(即queryGranularity)和数据分片的时间粒度(即segmentGranularity),一般在T+1的场景中,这两个值都可以设置为day

提交摄入任务

官方提供的数据摄取JSON如下,可以以此为模版修改(用#开头的是我们添加的注释,正式提交的时候请将注释删除,否则不是合法JSON文件):

{
  "type" : "index_hadoop",
  "spec" : {
    "ioConfig" : {
      "type" : "hadoop",
      "inputSpec" : {
        "type" : "static",
        "paths" : "quickstart/wikiticker.json"  #原始数据所在的HDFS路径,可以是目录,但目录不可以嵌套;有多个目录可以使用逗号分隔  
      }
    },
    "dataSchema" : {
      "dataSource" : "wikiticker", #导入druid之后的datasource名字,最好是可以识别团队的前缀
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "day", #数据分片的时间粒度
        "queryGranularity" : "none",  #最小的查询时间粒度
        "intervals" : ["2015-09-12/2015-09-13"] #本次导入数据所覆盖的时间范围
      },
      "parser" : {
        "type" : "hadoopyString",
        "parseSpec" : {
          "format" : "json", #原始数据的格式,可以是JSON、CSV、TSV
          "dimensionsSpec" : {
            "dimensions" : [ #在此指定导入数据的维度
              "channel",
              "cityName",
              "comment",
              "countryIsoCode",
              "countryName",
              "isAnonymous",
              "isMinor",
              "isNew",
              "isRobot",
              "isUnpatrolled",
              "metroCode",
              "namespace",
              "page",
              "regionIsoCode",
              "regionName",
              "user"
            ]
          },
          "timestampSpec" : { #指定导入数据的时间列以及时间格式
            "format" : "auto",
            "column" : "time"
          }
        }
      },
      "metricsSpec" : [ #指定导入数据的指标列,以及各指标列的聚合函数
        {
          "name" : "count",
          "type" : "count"
        },
        {
          "name" : "added",
          "type" : "longSum",
          "fieldName" : "added" #fieldName是原始数据中的列名,name是在druid中的列名,可以不同
        },
        {
          "name" : "deleted",
          "type" : "longSum",
          "fieldName" : "deleted"
        },
        {
          "name" : "delta",
          "type" : "longSum",
          "fieldName" : "delta"
        },
        {
          "name" : "user_unique",
          "type" : "hyperUnique",
          "fieldName" : "user"
        }
      ]
    },
    "tuningConfig" : {
      "type" : "hadoop",
      "partitionsSpec" : {
        "type" : "hashed",
        "targetPartitionSize" : 5000000
      },
      "jobProperties" : {}
    }
  }
}

(注意:如果是csv等需要指定分隔符的文件格式,需要在json文件中,添加delimiter,去指定文件的分隔符。以csv文件格式为例,需要在文件中添加一行 "delimiter":",")

提交命令:

curl -X 'POST' -H 'Content-Type:application/json' -d @my-index-task.json OVERLORD_IP:8090/druid/indexer/v1/task

MultiValueDelimiter

如果原始数据中有需要根据特殊字符进行拆分的dimension,但是每个dimension用来拆分的特殊字符又不一样,可以使用multiValueDelimiter来分别指定。例如:

JSON 格式

delimiter: ","
listDelimiter: null
multiValueDelimiter: {
     "col3": "u0002"
     "col4": "u0003"
}
columns:  col1,col2,col3,col4,col5

input:

"1,2,xu0002yu0002z,au0003bu0003c,eu0002fu0002g"

parsedResult:

{
    "col1": "1",
    "col2": "2",
    "col3": [
            "x",
            "y",
            "z"
      ],
    "col4": [
            "a",
            "b",
            "c"
    ],
    "col5": "eu0002fu0002g"
}

如果只配置listDelimiter,则用listDelimiter进行分割;
如果listDelimiter和multiValueDelimiter同时存在,则以multiValueDelimiter为准;
multiValueDelimiter中不指定分隔符的列,则不对该列进行分割

聚合函数

Count聚合

{ "type" : "count", "name" : <output_name> }
  • Count计算的是Druid导入的数据的行数, 而不是原始数据的行数
  • This is because Druid rolls up data at ingestion time.

Sum聚合

Sum聚合包括longSumdoubleSum

longSum聚合
{ "type" : "longSum", "name" : <output_name>, "fieldName" : <metric_name> }
  • 64bit, 有符号整型
doubleSum
{ "type" : "doubleSum", "name" : <output_name>, "fieldName" : <metric_name> }
  • 64bit, 浮点型

Min/Max聚合

Min/ Max聚合分为double类型和long类型

doubleMin/doubleMax聚合
{ "type" : "doubleMin", "name" : <output_name>, "fieldName" : <metric_name> }
{ "type" : "doubleMax", "name" : <output_name>, "fieldName" : <metric_name> }
  • doubleMin计算所有metric列的值和Double.POSITIVE_INFINITY中的最小值
  • doubleMax计算所有metric列的值和Double.NEGATIVE_INFINITY的最小值
longMin/longMax聚合
{ "type" : "longMin", "name" : <output_name>, "fieldName" : <metric_name> }
{ "type" : "longMax", "name" : <output_name>, "fieldName" : <metric_name> }
  • longMin计算所有metric列的值和Long.MAX_VALUE中的最小值
  • longMax计算所有metric列的值和Long.MIN_VALUE中的最大值

HyperUnique聚合

{ "type" : "hyperUnique", "name" : <output_name>, "fieldName" : <metric_name> }
  • 使用HyperLogLog算法计算某个指标的distinct count

Theta Sketch聚合

{ "type" : "thetaSketch", "name" : <output_name>, "fieldName" : <metric_name> }
  • 基于雅虎开源的Theta Sketch框架计算某个指标的distinct count,也可以用作留存分析。详情

Delta Ingestion - 增量数据摄入

当已经加载好的数据,需要再追加新的数据的时候,可以使用这种方法进行提交

json 配置

"ioConfig" : {
  "type" : "hadoop",
  "inputSpec" : {
    "type" : "multi",
    "children": [
      {
        "type" : "dataSource",
        "ingestionSpec" : {
          "dataSource": "wikipedia",
          "intervals": ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"],
          "segments": [
            {
              "dataSource": "test1",
              "interval": "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000",
              "version": "v2",
              "loadSpec": {
                "type": "local",
                "path": "/tmp/index1.zip"
              },
              "dimensions": "host",
              "metrics": "visited_sum,unique_hosts",
              "shardSpec": {
                "type": "none"
              },
              "binaryVersion": 9,
              "size": 2,
              "identifier": "test1_2000-01-01T00:00:00.000Z_3000-01-01T00:00:00.000Z_v2"
            }
          ]
        }
      },
      {
        "type" : "static",
        "paths": "/path/to/more/wikipedia/data/"
      }
    ]
  },
  ...
}

在ioConfig 的inputSpec 中,指定要将数据追加到哪里,ingestionSpec中是所有Segments的详细信息,其中:

  1. dataSource指定要追加的dataSource的名字
  2. intervals为Segments的时间范围
  3. segments,这是一个list,可以添加若干个对应的Segments

如上述配置好数据加载json文件以后,执行如下命令,提交task

curl -X 'POST' -H 'Content-Type:application/json' -d @my-index-task.json OVERLORD_IP:8090/druid/indexer/v1/task

注意:新追加的数据,会均匀分布在每个Segments中,但是如果追加到Segments的数据量超过了每个Segments能够承受的大小,不会另外新建Segments,因此,该方法适用于小量的数据追加,避免Segments变的很大。

原文地址:https://www.cnblogs.com/029zz010buct/p/12663425.html