使用logstash从Kafka中拉取数据并传输给elasticsearch且创建相应索引的操作

注意事项:默认Kafka传递给elastci的数据是在'data'字段,且不包含其他数据,所以需要使用额外的操作进行处理

logstash配置文件操作

input {

  kafka {
    bootstrap_servers => "172.17.107.187:9092,172.17.107.187:9093,172.17.107.187:9094"  # 字符串形式,kafka集群地址
    auto_offset_reset => "latest" # 拉取最近数据
    consumer_threads => 5 # 使用的线程数
    decorate_events => true   # 传递给elastci的数据增加附加数据
    topics => ["test_canal_topic"] # 拉取的kafka的指定topic
    tags => ["canal"] # 标签,额外使用该参数可以在elastci中创建不同索引
  }
  
}


filter {
  # 把默认的data字段重命名为message字段,方便在elastic中显示
  mutate {
    rename => ["data", "message"]
  }
  
  # 还可以使用其他的处理方式,在此就不再列出来了
}

output {

  elasticsearch {
    hosts => ["http://172.17.107.187:9203", "http://172.17.107.187:9201","http://172.17.107.187:9202"]
    index => "filebeat_%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}" # decorate_events=true的作用,可以使用metadata中的数据
    user => "elastic"
    password => "escluter123456"
  }

}

原文地址:https://www.cnblogs.com/sanduzxcvbnm/p/11634352.html