Kafka-Flume-elasticsearch

a1.sources = kafkaSource
a1.channels = memoryChannel
a1.sinks = elasticsearch

a1.sources.kafkaSource.channels = memoryChannel
a1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.kafkaSource.zookeeperConnect = 127.0.0.1:2181
a1.sources.kafkaSource.topic = test1
a1.sources.kafkaSource.groupId = flume


a1.sinks.elasticsearch.channel = memoryChannel
a1.sinks.elasticsearch.type=org.apache.flume.sink.elasticsearch.ElasticSearchSink
a1.sinks.elasticsearch.hostNames=127.0.0.1:9300
a1.sinks.elasticsearch.indexType = bar_type
a1.sinks.elasticsearch.indexName=logstash
a1.sinks.elasticsearch.clusterName=jachs
a1.sinks.elasticsearch.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer


a1.channels.memoryChannel.type=memory
a1.channels.memoryChannel.capacity=10000
a1.channels.memoryChannel.transactionCapacity=1000

 Flume1.6版本,elasticsearch1.5.1版本,将Kafka以及ES的Lib文件夹下所有jar包导入Flume Lib下。

原文地址:https://www.cnblogs.com/zhanchaohan/p/9705845.html