elasticsearch-jdbc 插件说明

关于 elasticsearch-jdbc插件:

这个插件能同步数据库的数据到elasticsearch(mysql/oracle)。
 

JDBC importer definition file

The general form of a JDBC import specification is a JSON object.

一个JDBC导入格式一般是JSON对象。

{
    "type" : "jdbc",
    "jdbc" : {
         <definition>
    }
}
 

Example:

{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "",
        "password" : "",
        "sql" : "select * from orders",
        "index" : "myindex",
        "type" : "mytype",
        ...          
    }
}
 

The importer can either be executed via stdin (for example with echo)

importer可以被执行标准输入(如 echo)

bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib
echo '{
  ...
}' | java 
    -cp "${lib}/*" 
    -Dlog4j.configurationFile=${bin}/log4j2.xml 
    org.xbib.tools.Runner 
    org.xbib.tools.JDBCImporter
 

or with explicit file name parameter from command line. Here is an example where statefile.json is a file which is loaded before execution.

或者在命令行里有明确的文件名参数。下面是一个例子,其中statefile.json是一个执行前加载的文件。

java 
    -cp "${lib}/*" 
    -Dlog4j.configurationFile=${bin}/log4j2.xml 
    org.xbib.tools.Runner 
    org.xbib.tools.JDBCImporter 
    statefile.json
 
 

This style is convenient for subsequent execution controlled by the statefile parameter if statefile is set to statefile.json.

如果statefile 设置到 statefile.json,这种方式有利于statefile 参数控制连串执行。

 

Parameters

"sql" : [
{
"statement" : "select ... from ... where a = ?, b = ?, c = ?",
"parameter" : [ "value for a", "value for b", "value for c" ]
},
{
"statement" : "insert into ... where a = ?, b = ?, c = ?",
"parameter" : [ "value for a", "value for b", "value for c" ],
"write" : "true"
},
{
"statement" : ...
}
]

下面是parameter参数的说明:

$now - the current timestamp                                              现在的时间戳
$state - the state, one of: BEFORE_FETCH, FETCH, AFTER_FETCH, IDLE, EXCEPTION   状态,获取前,获取,获取后,空闲,异常
$metrics.counter - a counter                                           一个计数器
$lastrowcount - number of rows from last statement      最后一行
$lastexceptiondate - SQL timestamp of last exception 最后异常的时间戳
$lastexception - full stack trace of last exception           最后一个异常的完整堆栈跟踪
$metrics.lastexecutionstart - SQL timestamp of the time when last execution started  最后执行开始的sql时间戳
$metrics.lastexecutionend - SQL timestamp of the time when last execution ended       最后执行结束的sql时间戳
$metrics.totalrows - total number of rows fetched        获取行的总数
$metrics.totalbytes - total number of bytes fetched     获取比特的总数
$metrics.failed - total number of failed SQL executions  失败的sql执行总数
$metrics.succeeded - total number of succeeded SQL executions 成功的sql执行总数

Here is the list of parameters for the jdbc block in the definition.

这里是在定义jdbc块参数的列表

strategy - the strategy of the JDBC importer, currently implemented: "standard", "column"  jdbcimporter出来,一般实现:standard,column

url - the JDBC driver URL            jdbc驱动url

user - the JDBC database user    jdbc数据库用户

password - the JDBC database password  jdbc数据库密码

sql - SQL statement(s), either a string or a list. If a statement ends with .sql, the statement is looked up in the file system. Example for a list of SQL statements:

sql 语句,可以是字符串或列表。如果一个声明以.sql结束,那么该语句是在文件系统中查找。下面是sql语句的列表举例

sql.statement - the SQL statement  sql语句

sql.write - boolean flag, if true, the SQL statement is interpreted as an insert/update statement that needs write access (default: false).  布尔类型,如果为true,sql类型被解释为一个需要写访问的新增/更新声明(默认为false)。

sql.callable - boolean flag, if true, the SQL statement is interpreted as a JDBC CallableStatement for stored procedures (default: false).   布尔类型,如果为true,sql语句被解释为一个成功的 jdbc CallableStatement (默认为false)。

sql.parameter - bind parameters for the SQL statement (in order). Some special values can be used with the following meanings: sql语句的绑定参数(按顺序)。一些特殊值可以使用以下含义

locale - the default locale (used for parsing numerical values, floating point character. Recommended values is "en_US")   默认当地(用于分析的数值,浮点字符推荐值EN_US

timezone - the timezone for JDBC setTimestamp() calls when binding parameters with timestamp values  jdbc setTimestamp() 调用绑定变量的变量值使用的时区

rounding - rounding mode for parsing numeric values. Possible values "ceiling", "down", "floor", "halfdown", "halfeven", "halfup", "unnecessary", "up"  用于解释数值的舍入方式,可能的值为: "ceiling", "down", "floor", "halfdown", "halfeven", "halfup", "unnecessary", "up"

scale - the precision of parsing numeric values  解析数值的精度

autocommit - true if each statement should be automatically executed. Default is false 如果每个语句应该自动执行为true(默认false)

fetchsize - the fetchsize for large result sets, most drivers use this to control the amount of rows in the buffer while iterating through the result set  结果集的获取大小,大多数驱动在结果集中迭代时使用控制缓存区的行数来实现

max_rows - limit the number of rows fetches by a statement, the rest of the rows is ignored  一个语句获取的数量,其余行被忽略。

max_retries - the number of retries to (re)connect to a database  重连到数据库的重连次数。

max_retries_wait - a time value for the time that should be waited between retries. Default is "30s" 重连的等待时间,默认为30s

resultset_type - the JDBC result set type, can be TYPE_FORWARD_ONLY, TYPE_SCROLL_SENSITIVE, TYPE_SCROLL_INSENSITIVE. Default is TYPE_FORWARD_ONLY

jdbc结果集类型,可以是TYPE_FORWARD_ONLY,TYPE_SCROLL_SENSITIVE, TYPE_SCROLL_INSENSITIVE. 默认TYPE_FORWARD_ONLY

resultset_concurrency - the JDBC result set concurrency, can be CONCUR_READ_ONLY, CONCUR_UPDATABLE. Default is CONCUR_UPDATABLE    jdbc结果集并发,可以是CONCUR_READ_ONLY, CONCUR_UPDATABLE 默认是 CONCUR_UPDATABLE

ignore_null_values - if NULL values should be ignored when constructing JSON documents. Default is false  构造JSON文档时忽略null值,默认为fasle。

detect_geo - if geo polygons / points in SQL columns should be parsed when constructing JSON documents. Default is true  在构造JSON文档时,如果sql列中 geo polygons / points 就解析,默认为true

detect_json - if json structures in SQL columns should be parsed when constructing JSON documents. Default is true   在构造JSON文档时,如果sql列中有json结构就解析。默认为true

prepare_database_metadata - if the driver metadata should be prepared as parameters. Default is false  驱动元数据是否将作为参数,默认为false

prepare_resultset_metadata - if the result set metadata should be prepared as parameters. Default is false 结果集元数据是否将作为参数,默认为false

column_name_map - a map of aliases that should be used as a replacement for column names of the database. Useful for Oracle 30 char column name limit. Default is null 映射的别名替换为数据库列表的名称。适用于oracle,最多30个字节,默认为null

query_timeout - a second value for how long an SQL statement is allowed to be executed before it is considered as lost. Default is 1800  sql语句在它被认为丢失前允许执行的秒数。默认为1800秒

connection_properties - a map for the connection properties for driver connection creation. Default is null 

用于驱动程序连接创建的连接属性的映射,默认为null

schedule - a single or a list of cron expressions for scheduled execution. Syntax is equivalent to the Quartz cron expression format (see below for syntax)                 这个类似于Quartz cron,是cron表达式,用于设置定时时间(有关语法见下文)。

threadpoolsize - a thread pool size for the scheduled executions for schedule parameter. If set to 1, all jobs will be executed serially. Default is 4.       scheduled的执行参数,是一个线程池数量。如果为1,所有的任务将持续执行。默认为4.

interval - a time value for the delay between two runs (default: not set)  两次运行之间的延迟时间值(默认没有设置)

elasticsearch.cluster - Elasticsearch cluster name  es的集群名称

elasticsearch.host - array of Elasticsearch host specifications (host name or host:port)  es主机规格数组(主机名 或 主机:端口号)

elasticsearch.port - port of Elasticsearch host  es主机端口号

elasticsearch.autodiscover - if true, JDBC importer will try to connect to all cluster nodes. Default is false  如果为true,jdbcimporter尝试着连接所有集群节点。默认为false

max_bulk_actions - the length of each bulk index request submitted (default: 10000)     每次批量索引请求提交的长度(默认10000)

max_concurrent_bulk_requests - the maximum number of concurrent bulk requests (default: 2 * number of CPU cores) 

  并发批量请求的最大数(默认为 CPU数量的2倍)

max_bulk_volume - a byte size parameter for the maximum volume allowed for a bulk request (default: "10m")   批量请求允许的最大比特大小(默认 10M)

max_request_wait - a time value for the maximum wait time for a response of a bulk request (default: "60s")  批量请求响应的最大等待时间值(默认 60s)

flush_interval - a time value for the interval period of flushing index docs to a bulk action (default: "5s")   刷新索引文档到批量操作 的间隔时间值 (默认5s)

index - the Elasticsearch index used for indexing   es的索引

type - the Elasticsearch type of the index used for indexing  es的类型 

index_settings - optional settings for the Elasticsearch index  es的索引可选设置

type_mapping - optional mapping for the Elasticsearch index type  es索引类型的可选映射

statefile - name of a file where the JDBC importer reads or writes state information  jdbcimporter读写的状态信息的文件名

metrics.lastexecutionstart - the UTC date/time of the begin of the last execution of a single fetch 最后一次单个提取执行的UTC开始时间

metrics.lastexecutionend - the UTC date/time of the end of the last execution of a single fetch  最后一次单个提取执行的UTC结束时间

metrics.counter - a counter for metrics, will be incremented after each single fetch 在每次单个提取后递增的度量计数器

metrics.enabled - if true, metrics logging is enabled. Default is false  如果为true,度量日志开启

metrics.interval - the interval between metrics logging. Default is 30 seconds. 日志的间隔时间,默认30s

metrics.logger.plain - if true, write metrics log messages in plain text format. Default is false 如果为true,以纯文本格式写日志信息,默认为false

metrics.logger.json - if true, write metric log messages in JSON format. Default is false 如果为trues,以JSON格式写日志信息,默认为false

 
下面是默认的参数设置:
{
    "jdbc" : {
        "strategy" : "standard",
        "url" : null,
        "user" : null,
        "password" : null,
        "sql" : null,
        "locale" : /* equivalent to Locale.getDefault().toLanguageTag() */,
        "timezone" : /* equivalent to TimeZone.getDefault() */,
        "rounding" : null,
        "scale" : 2,
        "autocommit" : false,
        "fetchsize" : 10, /* if URL contains MySQL JDBC driver URL, this is Integer.MIN */
        "max_rows" : 0,
        "max_retries" : 3,
        "max_retries_wait" : "30s",
        "resultset_type" : "TYPE_FORWARD_ONLY",
        "resultset_concurreny" : "CONCUR_UPDATABLE",
        "ignore_null_values" : false,
        "prepare_database_metadata" : false,
        "prepare_resultset_metadata" : false,
        "column_name_map" : null,
        "query_timeout" : 1800,
        "connection_properties" : null,
        "schedule" : null,
        "interval" : 0L,
        "threadpoolsize" : 1,
        "index" : "jdbc",
        "type" : "jdbc",
        "index_settings" : null,
        "type_mapping" : null,
        "max_bulk_actions" : 10000,
        "max_concurrent_bulk_requests" : 2 * available CPU cores,
        "max_bulk_volume" : "10m",
        "max_request_wait" : "60s",
        "flush_interval" : "5s"
    }
}
 
 

Time scheduled execution

Setting a cron expression in the parameter schedule enables repeated (or time scheduled) runs.

You can also define a list of cron expressions (in a JSON array) to schedule for many different time schedules.

Example of a schedule parameter:

    "schedule" : "0 0-59 0-23 ? * *"

This executes JDBC importer every minute, every hour, all the days in the week/month/year.

(这个是使用cron表达式来执行定时计划任务,做过java定时任务的应该都熟悉。)

设置cron表达式作为计划的参数来启用重复(或计划时间)运行。

你也可以定义一个cron表达式(在JSON数组里)的列表为执行许多不同时间的定时计划。

下面是定时计划参数举例:

    "schedule" : "0 0-59 0-23 ? * *"  

这个JDBCimporter将在 每星期/月/年 里的每天的每小时的每分钟执行。(也就是每分钟的0秒会执行

 

(下面是关于cron表达式的介绍,就不翻译了。)

The following documentation about the syntax of the cron expression is copied from the Quartz scheduler javadoc page.

Cron expressions provide the ability to specify complex time combinations such as "At 8:00am every Monday through Friday" or "At 1:30am every last Friday of the month".

Cron expressions are comprised of 6 required fields and one optional field separated by white space. The fields respectively are described as follows:

Field NameAllowed ValuesAllowed Special Characters
Seconds 0-59 , - * /
Minutes 0-59 , - * /
Hours 0-23 , - * /
Day-of-month 1-31 , - * ? / L W
Month 1-12 or JAN-DEC , - * /
Day-of-Week 1-7 or SUN-SAT , - * ? / L #
Year (Optional) empty, 1970-2199 , - * /

The '' character is used to specify all values. For example, "" in the minute field means "every minute".

The '?' character is allowed for the day-of-month and day-of-week fields. It is used to specify 'no specific value'. This is useful when you need to specify something in one of the two fields, but not the other.

The '-' character is used to specify ranges For example "10-12" in the hour field means "the hours 10, 11 and 12".

The ',' character is used to specify additional values. For example "MON,WED,FRI" in the day-of-week field means "the days Monday, Wednesday, and Friday".

The '/' character is used to specify increments. For example "0/15" in the seconds field means "the seconds 0, 15, 30, and 45". And "5/15" in the seconds field means "the seconds 5, 20, 35, and 50". Specifying '*' before the '/' is equivalent to specifying 0 is the value to start with. Essentially, for each field in the expression, there is a set of numbers that can be turned on or off. For seconds and minutes, the numbers range from 0 to 59. For hours 0 to 23, for days of the month 0 to 31, and for months 1 to 12. The "/" character simply helps you turn on every "nth" value in the given set. Thus "7/6" in the month field only turns on month "7", it does NOT mean every 6th month, please note that subtlety.

The 'L' character is allowed for the day-of-month and day-of-week fields. This character is short-hand for "last", but it has different meaning in each of the two fields. For example, the value "L" in the day-of-month field means "the last day of the month" - day 31 for January, day 28 for February on non-leap years. If used in the day-of-week field by itself, it simply means "7" or "SAT". But if used in the day-of-week field after another value, it means "the last xxx day of the month" - for example "6L" means "the last friday of the month". You can also specify an offset from the last day of the month, such as "L-3" which would mean the third-to-last day of the calendar month. When using the 'L' option, it is important not to specify lists, or ranges of values, as you'll get confusing/unexpected results.

The 'W' character is allowed for the day-of-month field. This character is used to specify the weekday (Monday-Friday) nearest the given day. As an example, if you were to specify "15W" as the value for the day-of-month field, the meaning is: "the nearest weekday to the 15th of the month". So if the 15th is a Saturday, the trigger will fire on Friday the 14th. If the 15th is a Sunday, the trigger will fire on Monday the 16th. If the 15th is a Tuesday, then it will fire on Tuesday the 15th. However if you specify "1W" as the value for day-of-month, and the 1st is a Saturday, the trigger will fire on Monday the 3rd, as it will not 'jump' over the boundary of a month's days. The 'W' character can only be specified when the day-of-month is a single day, not a range or list of days.

The 'L' and 'W' characters can also be combined for the day-of-month expression to yield 'LW', which translates to "last weekday of the month".

The '#' character is allowed for the day-of-week field. This character is used to specify "the nth" XXX day of the month. For example, the value of "6#3" in the day-of-week field means the third Friday of the month (day 6 = Friday and "#3" = the 3rd one in the month). Other examples: "2#1" = the first Monday of the month and "4#5" = the fifth Wednesday of the month. Note that if you specify "#5" and there is not 5 of the given day-of-week in the month, then no firing will occur that month. If the '#' character is used, there can only be one expression in the day-of-week field ("3#1,6#3" is not valid, since there are two expressions).

The legal characters and the names of months and days of the week are not case sensitive.

Note: Support for specifying both a day-of-week and a day-of-month value is not complete (you'll need to use the '?' character in one of these fields). Overflowing ranges is supported - that is, having a larger number on the left hand side than the right. You might do 22-2 to catch 10 o'clock at night until 2 o'clock in the morning, or you might have NOV-FEB. It is very important to note that overuse of overflowing ranges creates ranges that don't make sense and no effort has been made to determine which interpretation CronExpression chooses. An example would be "0 0 14-6 ? * FRI-MON".

 

Structured objects 结构化对象

One of the advantage of SQL queries is the join operation. From many tables, new tuples can be formed.

{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "",
        "password" : "",
        "sql" : "select "relations" as "_index", orders.customer as "_id", orders.customer as "contact.customer", employees.name as "contact.employee" from orders left join employees on employees.department = orders.department order by _id"
    }
}
 
For example, these rows from SQL
mysql> select "relations" as "_index", orders.customer as "_id", orders.customer as "contact.customer", employees.name as "contact.employee"  from orders left join employees on employees.department = orders.department order by _id;
+-----------+-------+------------------+------------------+
| _index    | _id   | contact.customer | contact.employee |
+-----------+-------+------------------+------------------+
| relations | Big   | Big              | Smith            |
| relations | Large | Large            | Müller           |
| relations | Large | Large            | Meier            |
| relations | Large | Large            | Schulze          |
| relations | Huge  | Huge             | Müller           |
| relations | Huge  | Huge             | Meier            |
| relations | Huge  | Huge             | Schulze          |
| relations | Good  | Good             | Müller           |
| relations | Good  | Good             | Meier            |
| relations | Good  | Good             | Schulze          |
| relations | Bad   | Bad              | Jones            |
+-----------+-------+------------------+------------------+
11 rows in set (0.00 sec)
 
will generate fewer JSON objects for the index relations.
index=relations id=Big {"contact":{"employee":"Smith","customer":"Big"}}
index=relations id=Large {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Large"}}
index=relations id=Huge {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Huge"}}
index=relations id=Good {"contact":{"employee":["Müller","Meier","Schulze"],"customer":"Good"}}
index=relations id=Bad {"contact":{"employee":"Jones","customer":"Bad"}}
 
Note how the employee column is collapsed into a JSON array. The repeated occurrence of the _id column controls how values are folded into arrays for making use of the Elasticsearch JSON data model. Make sure your SQL query is ordered by _id.
 

Column names for JSON document construction JSON文档构造列名

In SQL, each column may be labeled. This label is used by the JDBC importer for JSON document construction. The dot is the path separator for the document strcuture.

For example

{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "",
        "password" : "",
        "sql" : "select products.name as "product.name", orders.customer as "product.customer.name", orders.quantity * products.price as "product.customer.bill" from products, orders where products.name = orders.product"
    }
}
 

the labeled columns are product.name, product.customer.name, and product.customer.bill.

A data example:

mysql> select products.name as "product.name", orders.customer as "product.customer", orders.quantity * products.price as "product.customer.bill" from products, orders where products.name = orders.product ;
+--------------+------------------+-----------------------+
| product.name | product.customer | product.customer.bill |
+--------------+------------------+-----------------------+
| Apples       | Big              |                     1 |
| Bananas      | Large            |                     2 |
| Oranges      | Huge             |                     6 |
| Apples       | Good             |                     2 |
| Oranges      | Bad              |                     9 |
+--------------+------------------+-----------------------+
5 rows in set, 5 warnings (0.00 sec)
 
 

The structured objects constructed from these columns are

id=0 {"product":{"name":"Apples","customer":{"bill":1.0,"name":"Big"}}}
id=1 {"product":{"name":"Bananas","customer":{"bill":2.0,"name":"Large"}}}
id=2 {"product":{"name":"Oranges","customer":{"bill":6.0,"name":"Huge"}}}
id=3 {"product":{"name":"Apples","customer":{"bill":2.0,"name":"Good"}}}
id=4 {"product":{"name":"Oranges","customer":{"bill":9.0,"name":"Bad"}}}
 
 

There are column labels with an underscore as prefix that are mapped to special Elasticsearch document parameters for indexing:

_index     the index this object should be indexed into
_type      the type this object should be indexed into
_id        the id of this object
_version   the version of this object
_parent    the parent of this object
_ttl       the time-to-live of this object
_routing   the routing of this object
 
 

See also

http://www.elasticsearch.org/guide/reference/mapping/parent-field.html

http://www.elasticsearch.org/guide/reference/mapping/ttl-field.html

http://www.elasticsearch.org/guide/reference/mapping/routing-field.html

 

Bracket notation for JSON array construction JSON数组构造的括号符号

When construction JSON documents, it is often the case you want to group SQL columns into a JSON object and line them up into JSON arrays. For allowing this, a bracket notation is used to identify children elements that repeat in each child.

当构造JSON文档时,通常情况下,您想将SQL列分组为一个JSON对象,并将它们排列成JSON数组。 为了允许这一点,使用括号符号来标识在每个子节点中重复的子元素。

Note, because of limitations in identifying SQL column groups, nested document structures may lead to repetitions of the same group. Fortunately, this is harmless to Elasticsearch queries.

注意,由于被识别SQL列组的限制,嵌套的文档结构可能导致同一组的重复。 幸运的是,这对Elasticsearch查询是无害的。

Example:

_idblog.nameblog.publishedblog.association[id]blog.association[name]blog.attachment[id]blog.attachment[name]
4679 Joe 2014-01-06 00:00:00 3917 John 9450 /web/q/g/h/57436356.jpg
4679 Joe 2014-01-06 00:00:00 3917 John 9965 /web/i/s/q/GS3193626.jpg
4679 Joe 2014-01-06 00:00:00 3917 John 9451 /web/i/s/q/GS3193626.jpg

Result:

{
    "blog" : {
        "attachment": [
            {
                "name" : "/web/q/g/h/57436356.jpg",
                "id" : "9450"
            },
            {
                "name" : "/web/i/s/q/GS3193626.jpg",
                "id" : "9965"
            },
            {
                "name" : "/web/i/s/q/GS3193626.jpg",
                "id" : "9451"
            }
        ],
        "name" : "Joe",
        "association" : [
            {
                "name" : "John",
                "id" : "3917"
            },
            {
                "name" : "John",
                "id" : "3917"
            },
            {
                "name" : "John",
                "id" : "3917"
            }
         ],
         "published":"2014-01-06 00:00:00"
     }
}
 

How to fetch a table?   如何获取表

For fetching a table, a "select *" (star) query can be used. Star queries are the simplest variant of selecting data from a database. They dump tables into Elasticsearch row-by-row. If no _id column name is given, IDs will be automatically generated.

对于获取表,可以使用“select *”(星号)查询。 星号查询是从数据库中选择数据的最简单的变体。 他们将表逐行转储到Elasticsearch中。 如果未指定_id列名称,将自动生成ID。

For example

{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "",
        "password" : "",
        "sql" : "select * from orders"
    }
}
 

and this table

mysql> select * from orders;
+----------+-----------------+---------+----------+---------------------+
| customer | department      | product | quantity | created             |
+----------+-----------------+---------+----------+---------------------+
| Big      | American Fruits | Apples  |        1 | 0000-00-00 00:00:00 |
| Large    | German Fruits   | Bananas |        1 | 0000-00-00 00:00:00 |
| Huge     | German Fruits   | Oranges |        2 | 0000-00-00 00:00:00 |
| Good     | German Fruits   | Apples  |        2 | 2012-06-01 00:00:00 |
| Bad      | English Fruits  | Oranges |        3 | 2012-06-01 00:00:00 |
+----------+-----------------+---------+----------+---------------------+
5 rows in set (0.00 sec)
 

will result into the following JSON documents

id=<random> {"product":"Apples","created":null,"department":"American Fruits","quantity":1,"customer":"Big"}
id=<random> {"product":"Bananas","created":null,"department":"German Fruits","quantity":1,"customer":"Large"}
id=<random> {"product":"Oranges","created":null,"department":"German Fruits","quantity":2,"customer":"Huge"}
id=<random> {"product":"Apples","created":1338501600000,"department":"German Fruits","quantity":2,"customer":"Good"}
id=<random> {"product":"Oranges","created":1338501600000,"department":"English Fruits","quantity":3,"customer":"Bad"}
 

How to update a table?  如何更新表?

The JDBC importer allows to write data into the database for maintenance purpose.

Writing back data into the database makes sense for acknowledging fetched data.

JDBCimporter允许写数据到数据库作为维护目的。

回写数据到数据库对于确认获取数据是有意义的。

Example:

{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "",
        "password" : "",
        "sql" : [
            {
                "statement" : "select * from "products""
            },
            {
                "statement" : "delete from "products" where "_job" = ?",
                "parameter" : [ "$job" ]
            }
        ],
        "index" : "my_jdbc_index",
        "type" : "my_jdbc_type"
    }
}
 

In this example, the DB administrator has prepared product rows and attached a _job column to it to enumerate the product updates incrementally. The assertion is that Elasticsearch should delete all products from the database after they are indexed successfully. The parameter $job is a counter. The importer state is saved in a file, so the counter is persisted.

在此示例中,数据库管理员已准备好产品行,并在其上附加了一个_job列,以逐步枚举产品更新。 断言是Elasticsearch应该在数据库成功建立索引后从数据库中删除所有产品。 参数$ job是一个计数器。 importer状态保存在文件中,因此计数器被保留。

How to select incremental data from a table? 如何从表中查询增量数据?

It is recommended to use timestamps in UTC for synchronization. This example fetches all product rows which has added since the last run, using a millisecond resolution column mytimestamp:

{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "statefile" : "statefile.json",
        "user" : "",
        "password" : "",
        "sql" : [
            {
                "statement" : "select * from products where mytimestamp > ?",
                "parameter" : [ "$metrics.lastexecutionstart" ]
            }
        ],
        "index" : "my_jdbc_index",
        "type" : "my_jdbc_type"
    }
}
 

the first time you run the script, it will generate the statefile.json file like this

{
  "type" : "jdbc",
  "jdbc" : { 
    "password" : "",
    "index" : "my_jdbc_index",
    "statefile" : "statefile.json",
    "metrics" : { 
      "lastexecutionstart" : "2016-03-27T06:37:09.165Z",
      "lastexecutionend" : "2016-03-27T06:37:09.501Z",
      "counter" : "1" 
    },  
    "type" : "my_jdbc_type",
    "user" : "",
    "url" : "jdbc:mysql://localhost:3306/test",
    "sql" : [ { 
      "statement" : "select * from products where mytimestamp > ?", 
      "parameter" : [ "$metrics.lastexecutionstart" ]
    } ] 
  }
}
 

after this, you can select incremental data from table.

Stored procedures or callable statements  存储过程或可调用语句

Stored procedures can also be used for fetchng data, like this example fo MySQL illustrates. See also Using Stored Procedures from where the example is taken.

create procedure GET_SUPPLIER_OF_COFFEE(
    IN coffeeName varchar(32), 
    OUT supplierName varchar(40)) 
    begin 
        select SUPPLIERS.SUP_NAME into supplierName 
        from SUPPLIERS, COFFEES 
        where SUPPLIERS.SUP_ID = COFFEES.SUP_ID 
        and coffeeName = COFFEES.COF_NAME; 
        select supplierName; 
    end

Now it is possible to call the procedure from the JDBC importer and index the result in Elasticsearch.

{
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "",
        "password" : "",
        "sql" : [
            {
                "callable" : true,
                "statement" : "{call GET_SUPPLIER_OF_COFFEE(?,?)}",
                "parameter" : [
                     "Colombian"
                ],
                "register" : {
                     "mySupplierName" : { "pos" : 2, "type" : "varchar" }
                }
            }
        ],
        "index" : "my_jdbc_index",
        "type" : "my_jdbc_type"
    }
}

Note, the parameter lists the input parameters in the order they should be applied, like in an ordinary statement. The register declares a list of output parameters in the particular order the pos number indicates. It is required to declare the JDBC type in the type attribute. mySupplierName, the key of the output parameter, is used as the Elasticsearch field name specification, like the column name specification in an ordinary SQL statement, because column names are not available in callable statement result sets.

If there is more than one result sets returned by a callable statement, the JDBC importer enters a loop and iterates through all result sets.


 
 
 
 
 
 
 
 
 
 
 
 
 

<wiz_tmp_tag id="wiz-table-range-border" contenteditable="false" style="display: none;">


文章源自微信公众号【刍荛采葑菲】,转载请注明。

原文地址:https://www.cnblogs.com/churao/p/8509641.html