Logstash过滤日志输出到MySQL

安装包下载地址:https://artifacts.elastic.co/downloads/logstash/

1、安装jdklogstash启动需要依赖java环境

# tar zxf jdk-8u191-linux-x64.tar.gz  -C  /usr/local/
# cd /usr/local/jdk1.8.0_191/
# pwd
/usr/local/jdk1.8.0_191
# vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_191
export PATH=$PATH:$JAVA_HOME/bin
# source /etc/profile
# java -version
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)

2、解压logstash

# tar zxf logstash-6.3.2.tar.gz -C /usr/local/
# cd /usr/local/logstash-6.3.2/

 3安装logstash-outpot-jdbc插件

*****在线安装
# /usr/local/logstash-6.3.2/bin/logstash-plugin install logstash-output-jdbc

补充:把安装的logstash-output-jdbc打包成zip

#  /usr/local/logstash-6.3.2/bin/logstash-plugin prepare-offline-pack --overwrite --output logstash-output-jdbc.zip logstash-output-jdbc

****离线安装

# /usr/local/logstash-6.3.2/bin/logstash-plugin  install  file:///root/logstash-output-jdbc.zip

4、jdbc插件依赖mysql-connector-java-5.1.37下载安装

# mkdir /usr/local/logstash-6.3.2/jdbc
# unzip mysql-connector-java-5.1.37.zip -d /usr/local/logstash-6.3.2/jdbc

 

5、创建logstash配置文件

# mkdir /usr/local/logstash-6.3.2/conf.d
# vim /usr/local/logstash-6.3.2/conf.d/logged.conf
input {
    file{
        path => "/root/jupyterhub.log"    #从jupyterhub.log中读取
        type => "log"
        start_position => "beginning"     
        sincedb_path => "/dev/null"        #从头开始读取
    }
}

filter {
    if [message] !~ "^[" {
        drop {}      #日志不是以[开头的就过滤掉
    }
    grok{         #正则匹配
        match => {
            "message" => "[(?<class>.*)]s(?<info>.*)"
        }
    }
    if [info] !~ "^User logged" {
        drop {}     #上面正则匹配字段info的内容不是以User logged开头的就过滤掉
    }
    grok{        #开始正则匹配
        match => {
            "message" => "[(?<head>[A-Z]{1})s(?<date>d{4}-d{2}-d{2}sd{2}:d{2}:d{2}).d{3}s(?<msg>.*)]s(?<status>.*):s(?<name>.*)"
       }
    }
    if [status] == "User logged in" {
        mutate {
            gsub => [ "status",'User logged in','login' ] #以上正则匹配字段status的内容如果是User logged in,就把它替换为login
        }
    }
    if [status] == "User logged out" {
         mutate {
            gsub => [ "status",'User logged out','logout' ] #以上正则匹配字段status的内容如果是User logged out,就把它替换为logout
        }
   }
}

output {     #输出到MySQL
    jdbc {
        driver_jar_path => "/usr/local/logstash-6.3.2/jdbc/mysql-connector-java-5.1.37/mysql-connector-java-5.1.37-bin.jar"   #指定jdbc插件依赖安装目录
        driver_class => "com.mysql.jdbc.Driver"
        connection_string => "jdbc:mysql://172.16.43.164:3306/logstash?user=root&password=123"   #指定连接mysql的信息
        statement => [ "INSERT INTO logged(date,type,username) values (?,?,?)","%{date}" ,"%{status}","%{name}" ] #从过滤的数据插入数据库表中
  }
}

6、mysql中创建logstash库和logged

mysql>create database logstash;
mysql>create table logged( 
      id int not null auto_increment,
      date varchar(50) not null,
      type varchar(20) not null,
      username varchar(20) not null, 
      PRIMARY KEY (id));

 

7、启动logstash

# pwd
/usr/local/logstash-6.3.2/conf.d
# ../bin/logstash  -f  logged.conf &

 8数据库验证

select * from logstash.logged;

原文地址:https://www.cnblogs.com/lina-2159/p/13674378.html