Logstash配置
input {
kafka {
zk_connect => "127.0.0.1:2181"
topic_id => "cluster"
codec => plain
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
output {
if [type]=="cluster3" or [type]=="cluster2" or [type]=="clusterjson"
{
elasticsearch {
hosts => ["localhost:9200"]
index => "test-kafka-%{type}-%{+YYYY-MM}"
}
}
stdout { codec => rubydebug }
}
Server.properties 主要内容
broker.id=0
############################# Socket Server Settings #############################
listeners=PLAINTEXT://:9092
# The port the socket server listens on
port=9092
Server-1.properties 主要内容
broker.id=1
############################# Socket Server Settings #############################
listeners=PLAINTEXT://:9093
# The port the socket server listens on
port=9093
启动kafka,server.properties和server-1.properties
E:softelkkafka_2.11-0.9.0.1>binwindowskafka-server-start.bat configcustomserver-1.properties
创建topic,并查看状态
查找端口占用情况,并删除一个节点
查看 topic状态
生产者输入json数据,如下
{"Name":"BULK 7","Data":7,"CreateTime":"2016-04-05T10:16:22Z","type":"cluster3"}
Kibana 4显示如下