Logstash 安装并把mysql数据同步到elasticsearch

1. 下载Logstash

由于我的elasticsearch是6.4.3,索引我logstash下载相同版本

https://artifacts.elastic.co/downloads/logstash/logstash-6.4.3.tar.gz

下mysql java jar包

https://dev.mysql.com/downloads/connector/j/5.1.html

通过ftp把两个包上传到linux中

2.安装Logstash

2.1 把解压并且移动到对应安装目录

tar -zxvf logstash-6.4.3.tar.gz
mv logstash-6.4.3 /usr/local/

2.2 在/usr/local/logstash-6.4.3 创建sync目录

mkdir sync

2.3 进入到sync目录,并创建同步配置文件

vim logstash-db-sync.conf

内容如下:

input {
	jdbc {
		# 设置 MySql/MariaDB 数据库url以及数据库名称
		jdbc_connection_string => "jdbc:mysql://192.168.1.6:3306/foodie-shop-dev?useUnicode=true&characterEncoding=UTF-8&autoReconnect&useSSL=false"
		# 用户名和密码
		jdbc_user => "root"
		jdbc_password => "123456"
		# 数据库驱动所在位置,可以是绝对路径或者相对路径
		jdbc_driver_library => "/usr/local/logstash-6.4.3/sync/mysql-connector-java-5.1.49.jar"
		# 驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		# 开启分页
		jdbc_paging_enabled => "true"
		# 分页每页数量,可以自定义
		jdbc_page_size => "1000"
		# 执行的sql文件路径
		statement_filepath => "/usr/local/logstash-6.4.3/sync/foodie-items.sql"
		# 设置定时任务间隔 含义:分、时、天、月、年 全部为*默认是一分钟跑一次
		schedule => "* * * * *"
		# 索引类型
		type => "_doc"
		# 是否开启记录上次追踪结果
		use_column_value => true
		# 记录上一次追踪的结果值
		last_run_metadata_path => "/usr/local/logstash-6.4.3/sync/track_time"
		#
        tracking_column => "updated_time"
        # tracking_column 对应字段的类型		
		tracking_column_type => "timestamp"
		# 是否清楚 last_run_metadata_path记录
		clean_run => false
		# 数据库字段名称大写转小写
		lowercase_column_names => false
		
	}
}	
output {
	elasticsearch {
		# es 地址
		hosts => ["192.168.174.128:9200"]
		index => "foodie-items"
		document_id => "%{itemId}"
	}
	# 日志输出
	stdout {
		codec => json_lines
	}
}

2.4 把mysql jar文件复制到/usr/local/logstash-6.4.3/sync/
2.5 配置sql文件 /usr/local/logstash-6.4.3/sync/foodie-items.sql

SELECT                                                                                             
           i.id as itemId,                                                                                
           i.item_name as itemName,                                                                       
           i.sell_counts as sellCounts,                                                                   
           ii.url as imgUrl,                                                                              
           tempSpec.price_discount as price,
           i.updated_time
       FROM                                                                                               
           items i                                                                                        
       LEFT JOIN                                                                                          
           items_img ii                                                                                   
       ON                                                                                                 
           i.id = ii.item_id                                                                              
       LEFT JOIN                                                                                          
           (                                                                                              
               SELECT                                                                                     
                   item_id, MIN(price_discount) as price_discount                                         
               FROM                                                                                       
                   items_spec                                                                             
               GROUP BY                                                                                   
                   item_id                                                                                
           ) tempSpec                                                                                     
       ON                                                                                                 
           i.id = tempSpec.item_id                                                                        
       WHERE                                                                                              
           ii.is_main = 1
           AND 
           i.updated_time >= :sql_last_value 

3.启动Logstash

3.1 启动Logstash进行同步

./logstash -f /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf

3.2 同步的时候有个问题,建立索引时候默认用的es默认的分词,不适合中文

4.修改logstash同步默认分词插件

4.1 通过postman请求es,获取默认模板

GET
http://192.168.174.128:9200/_template/logstash

内容是

{
    "logstash": {
        "order": 0,
        "version": 60001,
        "index_patterns": [
            "logstash-*"
        ],
        "settings": {
            "index": {
                "refresh_interval": "5s"
            }
        },
        "mappings": {
            "_default_": {
                "dynamic_templates": [
                    {
                        "message_field": {
                            "path_match": "message",
                            "match_mapping_type": "string",
                            "mapping": {
                                "type": "text",
                                "norms": false
                            }
                        }
                    },
                    {
                        "string_fields": {
                            "match": "*",
                            "match_mapping_type": "string",
                            "mapping": {
                                "type": "text",
                                "norms": false,
                                "fields": {
                                    "keyword": {
                                        "type": "keyword",
                                        "ignore_above": 256
                                    }
                                }
                            }
                        }
                    }
                ],
                "properties": {
                    "@timestamp": {
                        "type": "date"
                    },
                    "@version": {
                        "type": "keyword"
                    },
                    "geoip": {
                        "dynamic": true,
                        "properties": {
                            "ip": {
                                "type": "ip"
                            },
                            "location": {
                                "type": "geo_point"
                            },
                            "latitude": {
                                "type": "half_float"
                            },
                            "longitude": {
                                "type": "half_float"
                            }
                        }
                    }
                }
            }
        },
        "aliases": {}
    }
}

4.2 把内容复制下来,修改成下面这样

    {
        "order": 0,
        "version": 1,
        "index_patterns": ["*"],
        "settings": {
            "index": {
                "refresh_interval": "5s"
            }
        },
        "mappings": {
            "_default_": {
                "dynamic_templates": [
                    {
                        "message_field": {
                            "path_match": "message",
                            "match_mapping_type": "string",
                            "mapping": {
                                "type": "text",
                                "norms": false
                            }
                        }
                    },
                    {
                        "string_fields": {
                            "match": "*",
                            "match_mapping_type": "string",
                            "mapping": {
                                "type": "text",
                                "norms": false,
								"analyzer":"ik_max_word",
                                "fields": {
                                    "keyword": {
                                        "type": "keyword",
                                        "ignore_above": 256
                                    }
                                }
                            }
                        }
                    }
                ],
                "properties": {
                    "@timestamp": {
                        "type": "date"
                    },
                    "@version": {
                        "type": "keyword"
                    },
                    "geoip": {
                        "dynamic": true,
                        "properties": {
                            "ip": {
                                "type": "ip"
                            },
                            "location": {
                                "type": "geo_point"
                            },
                            "latitude": {
                                "type": "half_float"
                            },
                            "longitude": {
                                "type": "half_float"
                            }
                        }
                    }
                }
            }
        },
        "aliases": {}
    }

4.3 在Logstash目录创建logstash-ik.json文件,把上面内容复制进去

vim /usr/local/logstash-6.4.3/sync/logstash-ik.json

4.4 在修改Logstash的同步文件

vim /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf

out修改成

output {
        elasticsearch {
                # es 地址
                hosts => ["192.168.174.128:9200"]
                index => "foodie-items"
                document_id => "%{itemId}"

                # 定义模板名称
                template_name => "myik"

                # 模板所在位置
                template => "/usr/local/logstash-6.4.3/sync/logstash-ik.json"
                template_overwrite => true
                # 默认为true, false关闭logstash自动管理模板功能
                manage_template => true
        }
        # 日志输出
        stdout {
                codec => json_lines
        }
}

原文地址:https://www.cnblogs.com/hardy-wang/p/13880369.html