Input模块插件的详细介绍

jdbc插件

  1. 增量同步和全量同步:
    1. 全量同步:是指将全部数据同步到es(通常是刚建立es,第一次同步时使用)
    2. 增量同步:是指将后续的更新、插入的记录同步到es
  2. 增量同步更新的注意点:
    1. tracking_column的字段必须是递增的字段(如id或者时间字段)
    2. tracking_column字段是id,则只能增加数据,而无法修改数据
    3. tracking_column字段是时间字段(如event_date),才可以实现同步修改操作
  3. 参数说明:
    1. record_last_run:是否记录上次执行结果;如果为真,将会把上次执行到的tracking_column字段的最后一行的值记录下来,保存到 last_run_metadata_path
    2. use_column_value:是否需要记录某个column 的值,record_last_run 为真,可以自定义 track _column 名称, 否则默认 track_column 的是 timestamp 的值
    3. tracking_column:如果 use_column_value 为真,需配置此参数;这个参数就是数据库给出的一个字段名称;该 column 必须是递增的。比如:记录时间的字段或者是自增的id
    4. last_run_metadata_path:指定文件,来记录上次执行到的tracking_column 字段的最后一行的值;可以在SQL语句中运用该值(WHERE MY_ID > :last_sql_value);last_sql_value 取得就是该文件中的值
    5. clean_run:是否清除 last_run_metadata_path 的记录;如果为真,那么每次都相当于从头开始查询所有的数据库记录
    6. lowercase_column_names:是否将 column 名称转小写
    7. statement:执行SQL语句
    8. statement_filepath:可以将SQL语句写入某个文件中,statement_filepath就是执行SQL文件的路径
    9. schedule:设置监听间隔,多久执行一次
      1. 从左到右的含义:分、时、日、月、年
      2. schedule => "* * * * *":全部为*默认含义为每分钟都更新
      3. schedule => "40 12 30 7 2020":年份不能写具体的年份,不然会报错: Error: invalid weekday expression (2020) 
      4. schedule => "40 12 30 7 *"
  4. 注意点:
    1. SQL查询语句是一次读取一批数据,tracking_column对应的字段中最后一行的值存储在last_run_metadata_path对应的文件中,而不是tracking_column对应的字段值中最大的数据存储在last_run_metadata_path对应的文件中
  5. 时区问题:
    1. UTC:称为通用协调时间;英国伦敦的本地时相同
    2. UTC + 时区差=本地时间
    3. 北京时区是东八区,领先UTC 8个小时
    4. logstash和ES使用的UTC时间
  6. 全量同步实例:
    input {
                jdbc {
                    # mysql 数据库链接,test为数据库名:"jdbc:mysql://192.168.29.1:3306/test?useUnicode=true&characterEncoding=utf8"
                    jdbc_connection_string => "jdbc:mysql://192.168.29.1:3306/test"
                    # 用户名和密码
                    jdbc_user => "root"
                    jdbc_password => "root"
                    # 驱动
                    jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.6.jar"
                    # 驱动类名
                    jdbc_driver_class => "com.mysql.jdbc.Driver"
                    #是否分页
                    jdbc_paging_enabled => "true"
                    jdbc_page_size => "50000"
                    #直接执行sql语句
                    statement =>"select * from MdL_001"
                    # 执行的sql 文件路径+名称
                    #statement_filepath => "/opt/data/jdbc.sql"
                    # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
                    schedule => "* * * * *"
                    # 索引类型
                    type => "jdbc"
                }
    }
  7. 增量同步实例:
    input {
        jdbc {
            # mysql 数据库链接,test为数据库名
            jdbc_connection_string => "jdbc:mysql://192.168.29.1:3306/test"
            # 用户名和密码
            jdbc_user => "root"
            jdbc_password => "root"
            # 驱动
            jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.6.jar"
            # 驱动类名
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            #处理中文乱码问题
            codec => plain { charset => "UTF-8"}
            #使用其它字段追踪,而不是用时间
            use_column_value => true
            #追踪的字段(使用MySQL中的时间字段而不使用自增的id字段;因为自增的id字段无法实现更新问题,因为后面需要更新的id值会小于last_run_metadata_path记录的值而无法实现更新操作)
            #tracking_column => id
            tracking_column => event_date
            record_last_run => true
            #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
            last_run_metadata_path => "/opt/data/station_parameter.txt"
            #开启分页查询
            jdbc_paging_enabled => true
            jdbc_page_size => 50000
            #直接执行sql语句
            statement =>"select * from mdl_001 id > :sql_last_value"
            # 执行的sql 文件路径+名称
            #statement_filepath => "/opt/data/jdbc.sql"
            # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
            schedule => "* * * * *"
            # 索引类型
            type => "jdbc"
        }
    }

file插件

Redis插件

原文地址:https://www.cnblogs.com/WeiKing/p/13448993.html