logstash中将kafka数据直接存储到es中

下载

建议到官网下载最新版
https://www.elastic.co/cn/downloads/logstash
本文使用logstash7.0.0
https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.tar.gz

wget https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.tar.gz
tar -xzvf logstash-7.0.0.tar.gz
mv logstash-7.0.0.tar.gz /usr/local/logstash



如图,Logstash Pipeline中input和output为必须元素,filter为可选元素,input插件使用来自源的数据,filter插件在您指定时修改数据output插件将数据写入目标。

我们可以通过运行最简单Logstash Pipeline来进行测试,步骤如下:

进入Logstash的解压目录,通过-e参数运行一个管道

cd logstash-7.0.0
./bin/logstash -e 'input { stdin { } } output { stdout {} }'
1
2
-e可以直接从命令行指定配置。通过在命令行指定配置,可以快速测试配置,而无需在迭代之间编辑文件。示例中的管道从标准输入stdin获取输入数据,并以结构化格式将输入数据移动到标准输出stdout 。




input {
    kafka {
    bootstrap_servers => ["192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092"]
    group_id => "elastic_group"
    topics => ["elastic_test"]
    consumer_threads => 12
    decorate_events => true
    }
}
output {
    elasticsearch {
    hosts => ["http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200"]
    index => "elastic_test"
    password => "XXX"
    user => "elastic"
    }
}

  



logstash版本为5.5.3,kafka版本为2.11,此版本默认内置了kafka插件,可直接配置使用,不需要重新安装插件;注意logstash5.x版本前后配置不太一样,注意甄别,必要时可去elasticsearch官网查看最新版配置参数的变化,例如logstash5.x版本以前kafka插件配置的是zookeeper地址,5.x以后配置的是kafka实例地址。

input{
      kafka{
        bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
        client_id => "test"
        group_id => "test"
        auto_offset_reset => "latest" //从最新的偏移量开始消费
        consumer_threads => 5
        decorate_events => true //此属性会将当前topic、offset、group、partition等信息也带到message中
        topics => ["logq","loge"] //数组类型,可配置多个topic
        type => "bhy" //所有插件通用属性,尤其在input里面配置多个数据源时很有用
      }
}
 使用了decorate_events属性,注意看logstash控制台打印的信息,会输出如下

"kafka":{"consumer_group":"test","partition":0,"offset":10430232,"topic":"logq","key":null}
 另外一个input里面可设置多个kafka,

input{
      kafka{
        bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
        client_id => "test1"
        group_id => "test1"
        auto_offset_reset => "latest"
        consumer_threads => 5
        decorate_events => true
        topics => ["loge"]
        type => "classroom"
      }
      kafka{
        bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
        client_id => "test2"
        group_id => "test2"
        auto_offset_reset => "latest"
        consumer_threads => 5
        decorate_events => true
        topics => ["logq"]
        type => "student"
      }
}
 假如你在filter模块中还要做其他过滤操作,并且针对input里面的每个数据源做得操作不一样,那你就可以根据各自定义的type来匹配

filter{
        if[type] == "classroom"{
            grok{
               ........
            }
        }
        if[type] == "student"{
            mutate{
               ........
            }
        }
}
 不只filter中可以这样,output里面也可以这样;并且当output为elasticsearch的时候,input里面的定义的type将会成为elasticsearch的你定义的index下的type

output {
        if[type] == "classroom"{
          elasticsearch{
               hosts => ["192.168.110.31:9200"]
               index => "school"
               timeout => 300
               user => "elastic"
               password => "changeme"
          }

        }
        if[type] == "student"{
            ........
        }
 }
 对于第一个存储到elasticsearch的路径为localhost:9200/school/classroom;第二个存储到elasticsearch的路径为localhost:9200/school/student。假如从来没有定义过type,默认的type为logs,访问路径为第一个存储到elasticsearch的路径为localhost:9200/school/logs,默认的type也可不加。

 

读取文件直接发送到es

  • 修改/usr/local/logstash/config/logstash-sample.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  #beats {
   # port => 5044
  #}
  file {
    path => "/var/log/httpd/access_log"
    start_position => beginning
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][logstash]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    #user => "elastic"
    #password => "changeme"
  }
}
  • 检查配置文件是否正确:(假设当前目录为/usr/local/logstash/config/)
../bin/logstash -t -f logstash-sample.conf
启动:
../bin/logstash -f logstash-sample.conf
加载本文件夹所有配置文件启动:
../bin/logstash -f ./
或后台启动:
nohup ../bin/logstash -f config/ &
    • 常用命令参数
      -f:通过这个命令可以指定Logstash的配置文件,根据配置文件配置logstash
      -e:后面跟着字符串,该字符串可以被当做logstash的配置(如果是“” 则默认使用stdin作为输入,stdout作为输出)
      -l:日志输出的地址(默认就是stdout直接在控制台中输出)
      -t:测试配置文件是否正确,然后退出。
原文地址:https://www.cnblogs.com/ExMan/p/14959639.html