logstash 日志 kafka 到 ES

logstash将kakfa数据消费到es

kafa集群

10.10.10.60:9092
10.10.10.61:9092
10.10.10.62:90920

es集群F5地址

10.10.10.64:9200

logstash配置文件 kafka2es.conf

input{		
    kafka {
        bootstrap_servers => "10.10.10.60:9092, 10.10.10.61:9092, 10.10.10.62:90920"
        topics => ["newgamexxx","newgameyyy","newgamezzz","newgamewww"]
        group_id => "xxx_prod_game_sitexx"
        auto_offset_reset => "earliest" 
        codec => "json"
        consumer_threads => 10
        client_id => "client_site1"
        max_poll_records => "150"
        max_poll_interval_ms => "600000"
        heartbeat_interval_ms => 2000
   }
}


#debug模式调试
#input {
#  stdin {
#  codec => json 
#  }
#}


output {
    #debug模式调试
    #stdout {
    # codec => rubydebug
    #}

	# 业务01	F5
    elasticsearch {
        index  =>  "%{es_index_name2}"
        document_id => "%{id}"
        hosts => ["10.10.10.61:9200"]
	user => "elastic"
        password => "wpasswordxxxy"
        codec => json
    }
	# 业务02 F5
    elasticsearch {
        index  =>  "%{es_index_name2}"
        document_id => "%{id}"
        hosts => ["10.10.10.64:9200"]
        user => "elastic"
        password => "twpasswordxxxyD"
        codec => json
    }
}

检查logstash到es的联通性

注释掉业务使用的input输入,下面的input

打开debug模式检查是否生效

input {
  stdin {
  codec => json
  }
}
output {
    stdout {
        codec => rubydebug
    }
}
 bin/logstash -f test_grok.conf --verbose --debug

需要后面使用--verbose --debug

这样直接在前台启动,便可以看到日志输出到终端.

原文地址:https://www.cnblogs.com/carry00/p/14021672.html