ELK Kafka json to elk

Logstash配置

   

input {

kafka {

zk_connect => "127.0.0.1:2181"

topic_id => "cluster"

codec => plain

reset_beginning => false

consumer_threads => 5

decorate_events => true

}

}

   

   

output {

if [type]=="cluster3" or [type]=="cluster2" or [type]=="clusterjson"

{

elasticsearch {

hosts => ["localhost:9200"]

index => "test-kafka-%{type}-%{+YYYY-MM}"

}

}

   

stdout { codec => rubydebug }

}

   

Server.properties 主要内容

broker.id=0

   

############################# Socket Server Settings #############################

   

listeners=PLAINTEXT://:9092

   

# The port the socket server listens on

port=9092

   

   

Server-1.properties 主要内容

   

broker.id=1

   

############################# Socket Server Settings #############################

   

listeners=PLAINTEXT://:9093

   

# The port the socket server listens on

port=9093

   

   

启动kafka,server.properties和server-1.properties

E:softelkkafka_2.11-0.9.0.1>binwindowskafka-server-start.bat configcustomserver-1.properties

   

创建topic,并查看状态

   

   

查找端口占用情况,并删除一个节点

   

   

查看 topic状态

   

   

   

生产者输入json数据,如下

   

{"Name":"BULK 7","Data":7,"CreateTime":"2016-04-05T10:16:22Z","type":"cluster3"}

   

Kibana 4显示如下

   

   

   

   

   

   

   

原文地址:https://www.cnblogs.com/liuyuhua/p/5356293.html